Advertisement
4javier

Untitled

Nov 18th, 2016
256
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 67.73 KB | None | 0 0
  1. /*
  2. * Copyright (C) 2014 MediaTek Inc.
  3. * Modification based on code covered by the mentioned copyright
  4. * and/or permission notice(s).
  5. */
  6. /*
  7. * Copyright 2012, The Android Open Source Project
  8. *
  9. * Licensed under the Apache License, Version 2.0 (the "License");
  10. * you may not use this file except in compliance with the License.
  11. * You may obtain a copy of the License at
  12. *
  13. * http://www.apache.org/licenses/LICENSE-2.0
  14. *
  15. * Unless required by applicable law or agreed to in writing, software
  16. * distributed under the License is distributed on an "AS IS" BASIS,
  17. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. * See the License for the specific language governing permissions and
  19. * limitations under the License.
  20. */
  21.  
  22. //#define LOG_NDEBUG 0
  23. #define LOG_TAG "NetworkSession"
  24. #include <utils/Log.h>
  25.  
  26. #include "ANetworkSession.h"
  27. #include "ParsedMessage.h"
  28.  
  29. #include <arpa/inet.h>
  30. #include <fcntl.h>
  31. #include <net/if.h>
  32. #include <linux/tcp.h>
  33. #include <netdb.h>
  34. #include <netinet/in.h>
  35. #include <sys/ioctl.h>
  36. #include <sys/socket.h>
  37. #include <unistd.h>
  38.  
  39. #include <media/stagefright/foundation/ABuffer.h>
  40. #include <media/stagefright/foundation/ADebug.h>
  41. #include <media/stagefright/foundation/AMessage.h>
  42. #include <media/stagefright/foundation/hexdump.h>
  43.  
  44. ///Add by MTK @{
  45. #include <cutils/properties.h>
  46. #include <stdlib.h>
  47. #include <sys/ioctl.h>
  48. #include "WifiDisplayUibcType.h"
  49. #ifdef MTB_SUPPORT
  50. #define ATRACE_TAG ATRACE_TAG_MTK_WFD
  51. #include <utils/Trace.h>
  52. #endif
  53. ///@}
  54. namespace android {
  55.  
  56. static uint16_t U16_AT(const uint8_t *ptr) {
  57. return ptr[0] << 8 | ptr[1];
  58. }
  59.  
  60. static uint32_t U32_AT(const uint8_t *ptr) {
  61. return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3];
  62. }
  63.  
  64. static uint64_t U64_AT(const uint8_t *ptr) {
  65. return ((uint64_t)U32_AT(ptr)) << 32 | U32_AT(ptr + 4);
  66. }
  67.  
  68. static const size_t kMaxUDPSize = 1500;
  69. static const int32_t kMaxUDPRetries = 200;
  70.  
  71. #define MAX_BUFFER_SIZE 2048 //1024
  72. #define THRESHOLD_BUFFER_SIZE 50
  73. #define WARN_BUFFER_SIZE 400
  74.  
  75. ///M : Add for Latency improve issue @{
  76. #define MAX_READ_COUNT 10
  77. #define RTP_HEADER_LEN 12
  78. #define USHORT_MAX 65535
  79.  
  80.  
  81. struct rtp_header_len_s
  82. {
  83. uint8_t buf[RTP_HEADER_LEN];
  84. };
  85.  
  86. /// @}
  87.  
  88. ///M : Add for Robust issue @{
  89. #define MAX_RTSP_TCP_CONN_RETRY 10
  90. /// @}
  91.  
  92.  
  93.  
  94. struct ANetworkSession::NetworkThread : public Thread {
  95. NetworkThread(ANetworkSession *session);
  96.  
  97. protected:
  98. virtual ~NetworkThread();
  99.  
  100. private:
  101. ANetworkSession *mSession;
  102.  
  103. virtual bool threadLoop();
  104.  
  105. DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
  106. };
  107.  
  108. struct ANetworkSession::Session : public RefBase {
  109. enum Mode {
  110. MODE_RTSP,
  111. MODE_DATAGRAM,
  112. MODE_WEBSOCKET,
  113. };
  114.  
  115. enum State {
  116. CONNECTING,
  117. CONNECTED,
  118. LISTENING_RTSP,
  119. LISTENING_TCP_DGRAMS,
  120. DATAGRAM,
  121. ///Add by MTK @{
  122. LISTENING_TCP_TEXT,
  123. LISTENING_TCP_UIBC,
  124. SOCKET_ERROR,
  125. ///@}
  126. };
  127. ///Add by MTK @{
  128. enum TCPType {
  129. TCP_DATAGRAM = 0,
  130. TCP_TEXTDATA,
  131. TCP_UIBC,
  132. TCP_BINDATA // WFD_HDCP_TX_SUPPORT
  133. };
  134. ///@}
  135.  
  136. Session(int32_t sessionID,
  137. State state,
  138. int s,
  139. const sp<AMessage> &notify);
  140.  
  141. int32_t sessionID() const;
  142. int socket() const;
  143. sp<AMessage> getNotificationMessage() const;
  144.  
  145. bool isRTSPServer() const;
  146. bool isTCPDatagramServer() const;
  147.  
  148. bool wantsToRead();
  149. bool wantsToWrite();
  150.  
  151. status_t readMore();
  152. status_t writeMore();
  153.  
  154. status_t sendRequest(
  155. const void *data, ssize_t size, bool timeValid, int64_t timeUs);
  156.  
  157. void setMode(Mode mode);
  158.  
  159. status_t switchToWebSocketMode();
  160.  
  161. ///Add by MTK @{
  162. bool isTCPServer() const;
  163. void setTCPConnectionType(int yesno);
  164. int getTCPConnectionType() const;
  165. void closeSocket();
  166. status_t writeDirectRequest(const void *data, ssize_t size);
  167. status_t mtkRTPRecvPause();
  168. status_t mtkRTPRecvResume();
  169. int64_t getRTPRecvNum();
  170. status_t resetRTPRecvNum();
  171. /// @}
  172. protected:
  173. virtual ~Session();
  174.  
  175. private:
  176. enum {
  177. FRAGMENT_FLAG_TIME_VALID = 1,
  178. };
  179. struct Fragment {
  180. uint32_t mFlags;
  181. int64_t mTimeUs;
  182. sp<ABuffer> mBuffer;
  183. #ifdef MTK_AOSP_ENHANCEMENT
  184. int64_t mInTimeUs;
  185. int64_t mLatencyBeginProfileMs;
  186. int64_t mLatencyToken;
  187. int32_t mIsVideo;
  188. bool mIsDummyVideo;
  189. bool mIsWFDPacket;
  190. bool mIsFirst;
  191. bool mIsLast;
  192. uint32_t mRtpSeqNumber;
  193. #endif
  194. };
  195.  
  196. int32_t mSessionID;
  197. State mState;
  198. Mode mMode;
  199. ///Add by MTK @{
  200. int mTcpType;
  201. ///@}
  202. ///M: Add for sink pause receive RTP data@{
  203. bool mRTPPause;
  204. /// @}
  205. ///M: Add for sink count RTP num @{
  206. int64_t mRTPCounter;
  207. ///@}
  208. ///M: Add for Latency issue
  209. bool fHaveSeenFirstPacket;
  210. uint16_t fNextExpectedSeqNo;
  211. static const int64_t kPrintIntervalUs = 10000000ll; //print time 10s
  212. int64_t mNextPrintTimeUs;
  213.  
  214. int64_t mPreUs;
  215. int64_t mCurUs;
  216. int mtotalSize;
  217.  
  218. /// @}
  219. int mSocket;
  220. sp<AMessage> mNotify;
  221. bool mSawReceiveFailure, mSawSendFailure;
  222. int32_t mUDPRetries;
  223.  
  224. List<Fragment> mOutFragments;
  225.  
  226. AString mInBuffer;
  227.  
  228. int64_t mLastStallReportUs;
  229.  
  230. int mThresholdCount;
  231.  
  232.  
  233. void notifyError(bool send, status_t err, const char *detail);
  234. void notify(NotificationReason reason);
  235.  
  236. void dumpFragmentStats(const Fragment &frag);
  237.  
  238. DISALLOW_EVIL_CONSTRUCTORS(Session);
  239. #ifdef MTK_AOSP_ENHANCEMENT
  240. public:
  241. status_t sendWFDRequest(List<sp<ABuffer> > &packets, int64_t timeUs = -1ll);
  242. #endif
  243. };
  244. ////////////////////////////////////////////////////////////////////////////////
  245.  
  246. ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
  247. : mSession(session) {
  248. }
  249.  
  250. ANetworkSession::NetworkThread::~NetworkThread() {
  251. }
  252.  
  253. bool ANetworkSession::NetworkThread::threadLoop() {
  254. mSession->threadLoop();
  255.  
  256. return true;
  257. }
  258.  
  259. ////////////////////////////////////////////////////////////////////////////////
  260.  
  261. ANetworkSession::Session::Session(
  262. int32_t sessionID,
  263. State state,
  264. int s,
  265. const sp<AMessage> &notify)
  266. : mSessionID(sessionID),
  267. mState(state),
  268. mMode(MODE_DATAGRAM),
  269. ///Add by MTK @{
  270. mTcpType(TCP_DATAGRAM),
  271. mRTPPause(false),
  272. //mRTPCounter(0),
  273. fHaveSeenFirstPacket(false),
  274. //mNextPrintTimeUs(-1),
  275. mPreUs(0ll),
  276. mCurUs(0ll),
  277. mtotalSize(0),
  278. ///@}
  279. mSocket(s),
  280. mNotify(notify),
  281. mSawReceiveFailure(false),
  282. mSawSendFailure(false),
  283. mUDPRetries(kMaxUDPRetries),
  284. mLastStallReportUs(-1ll) {
  285. ALOGI("A new session:%d-%d", sessionID, state);
  286. if (mState == CONNECTED) {
  287. struct sockaddr_in localAddr;
  288. socklen_t localAddrLen = sizeof(localAddr);
  289.  
  290. int res = getsockname(
  291. mSocket, (struct sockaddr *)&localAddr, (socklen_t*)&localAddrLen);
  292. CHECK_GE(res, 0);
  293.  
  294. struct sockaddr_in remoteAddr;
  295. socklen_t remoteAddrLen = sizeof(remoteAddr);
  296.  
  297. res = getpeername(
  298. mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
  299. CHECK_GE(res, 0);
  300.  
  301. in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
  302. AString localAddrString = StringPrintf(
  303. "%d.%d.%d.%d",
  304. (addr >> 24),
  305. (addr >> 16) & 0xff,
  306. (addr >> 8) & 0xff,
  307. addr & 0xff);
  308.  
  309. addr = ntohl(remoteAddr.sin_addr.s_addr);
  310. AString remoteAddrString = StringPrintf(
  311. "%d.%d.%d.%d",
  312. (addr >> 24),
  313. (addr >> 16) & 0xff,
  314. (addr >> 8) & 0xff,
  315. addr & 0xff);
  316.  
  317. sp<AMessage> msg = mNotify->dup();
  318. msg->setInt32("sessionID", mSessionID);
  319. msg->setInt32("reason", kWhatClientConnected);
  320. msg->setString("server-ip", localAddrString.c_str());
  321. msg->setInt32("server-port", ntohs(localAddr.sin_port));
  322. msg->setString("client-ip", remoteAddrString.c_str());
  323. msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
  324. msg->post();
  325. }
  326.  
  327.  
  328. char val[PROPERTY_VALUE_MAX];
  329. if (property_get("media.wfd.threshold", val, NULL)) {
  330. mThresholdCount = atoi(val);
  331. }else{
  332. mThresholdCount = THRESHOLD_BUFFER_SIZE;
  333. }
  334.  
  335. ALOGI("mThresholdCount:%d", mThresholdCount);
  336. }
  337.  
  338. ANetworkSession::Session::~Session() {
  339. ALOGI("Session %d gone", mSessionID);
  340.  
  341. ///M: Modify to close socket @{
  342. if(mSocket != -1){
  343. close(mSocket);
  344. mSocket = -1;
  345. }
  346. /// @}
  347. }
  348.  
  349. int32_t ANetworkSession::Session::sessionID() const {
  350. return mSessionID;
  351. }
  352.  
  353. int ANetworkSession::Session::socket() const {
  354. return mSocket;
  355. }
  356.  
  357. void ANetworkSession::Session::setMode(Mode mode) {
  358. mMode = mode;
  359. }
  360.  
  361. status_t ANetworkSession::Session::switchToWebSocketMode() {
  362. if (mState != CONNECTED || mMode != MODE_RTSP) {
  363. return INVALID_OPERATION;
  364. }
  365.  
  366. mMode = MODE_WEBSOCKET;
  367.  
  368. return OK;
  369. }
  370.  
  371. sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
  372. return mNotify;
  373. }
  374.  
  375. bool ANetworkSession::Session::isRTSPServer() const {
  376. return mState == LISTENING_RTSP;
  377. }
  378.  
  379. bool ANetworkSession::Session::isTCPDatagramServer() const {
  380. return mState == LISTENING_TCP_DGRAMS;
  381. }
  382.  
  383. bool ANetworkSession::Session::wantsToRead() {
  384. return !mSawReceiveFailure && mState != CONNECTING;
  385. }
  386.  
  387. bool ANetworkSession::Session::wantsToWrite() {
  388. return !mSawSendFailure
  389. && (mState == CONNECTING
  390. || (mState == CONNECTED && !mOutFragments.empty())
  391. || (mState == DATAGRAM && !mOutFragments.empty()));
  392. }
  393.  
  394. status_t ANetworkSession::Session::readMore() {
  395. if (mState == DATAGRAM) {
  396. CHECK_EQ(mMode, MODE_DATAGRAM);
  397.  
  398. status_t err;
  399.  
  400. /// M : Latency improve issue @{
  401. #if 1
  402. uint16_t fSeqNo = 0;
  403. bool fMbitIs1 = false;
  404.  
  405. do {
  406. sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
  407.  
  408. uint8_t *fTmpBuffer = buf->base();
  409.  
  410. struct sockaddr_in remoteAddr;
  411. socklen_t remoteAddrLen = sizeof(remoteAddr);
  412.  
  413. ssize_t n;
  414. do {
  415. n = recvfrom(
  416. mSocket, buf->data(), buf->capacity(), 0,
  417. (struct sockaddr *)&remoteAddr, &remoteAddrLen);
  418.  
  419. } while (n < 0 && errno == EINTR);
  420.  
  421. err = OK;
  422. if (n < 0) {
  423. err = -errno;
  424. } else if (n == 0) {
  425. err = -ECONNRESET;
  426. }else {
  427. ///M : Add for pause RTP data to sink @{
  428. if (mRTPPause){
  429. err = OK;
  430. fHaveSeenFirstPacket = false;
  431. break;
  432. }
  433. ///@ }
  434.  
  435. fSeqNo = U16_AT(&fTmpBuffer[2]);
  436. fMbitIs1 = false;
  437. //Mbit is flag for last packet of frame
  438. if((fTmpBuffer[1] >> 7) & 0x1)
  439. {
  440. fMbitIs1 = true;
  441. }
  442.  
  443. if (!fHaveSeenFirstPacket)
  444. {
  445. fNextExpectedSeqNo = fSeqNo + 1;
  446. fHaveSeenFirstPacket = true;
  447. }
  448. else
  449. {
  450. if (fNextExpectedSeqNo != fSeqNo)
  451. {
  452. uint32_t diff = (fSeqNo - fNextExpectedSeqNo + USHORT_MAX - 1) % (USHORT_MAX -1);
  453. ///M : drop gap is large or old packet
  454. if(diff >60000)
  455. {
  456. ALOGI("Wrong order, Recv SeqNo = %u, Expect SeqNo = %u\n",
  457. fSeqNo, fNextExpectedSeqNo);
  458. continue;
  459. }
  460.  
  461. ALOGI("Recv SeqNo = %u, Expect SeqNo = %u, Packet lost = %d\n",
  462. fSeqNo, fNextExpectedSeqNo, diff);
  463.  
  464.  
  465. }
  466. fNextExpectedSeqNo = fSeqNo + 1;
  467. }
  468.  
  469. buf->setRange(0, n);
  470.  
  471. int64_t nowUs = ALooper::GetNowUs();
  472. buf->meta()->setInt64("arrivalTimeUs", nowUs);
  473. ///M: Add for Latency issue, print time log
  474. if(mNextPrintTimeUs < 0ll || nowUs >= mNextPrintTimeUs)
  475. {
  476. buf->meta()->setInt32("printTime",1);
  477. mNextPrintTimeUs = nowUs + kPrintIntervalUs;
  478. }
  479. else
  480. {
  481. buf->meta()->setInt32("printTime",0);
  482. }
  483. if(fMbitIs1)
  484. {
  485. buf->meta()->setInt32("Mbit",1);
  486. }
  487. else
  488. {
  489. buf->meta()->setInt32("Mbit",0);
  490. }
  491.  
  492. sp<AMessage> notify = mNotify->dup();
  493. notify->setInt32("sessionID", mSessionID);
  494. notify->setInt32("reason", kWhatDatagram);
  495.  
  496. uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
  497. notify->setString(
  498. "fromAddr",
  499. StringPrintf(
  500. "%u.%u.%u.%u",
  501. ip >> 24,
  502. (ip >> 16) & 0xff,
  503. (ip >> 8) & 0xff,
  504. ip & 0xff).c_str());
  505.  
  506. notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
  507.  
  508. notify->setBuffer("data", buf);
  509. notify->post();
  510. //Count RTP number received
  511. mRTPCounter ++;
  512.  
  513. mtotalSize += n;
  514. mCurUs = ALooper::GetNowUs();
  515. if (mPreUs==0ll)
  516. {
  517. mPreUs = mCurUs;
  518. ALOGI("ANetworkSession Bandwidth=0.0 (KB/s) at %lld Ms", mPreUs/1000);
  519. } else {
  520. if (mCurUs-mPreUs>=1000000) {
  521. int _ms = (mCurUs-mPreUs)/1000;
  522.  
  523. ALOGI("ANetworkSession Bandwidth=%2.2f (KB/s)",
  524. ((float)mtotalSize*1000/_ms)/1024);
  525.  
  526. // update for next
  527. mPreUs = mCurUs;
  528. mtotalSize = 0;
  529. }
  530. }
  531.  
  532. }
  533. } while (err == OK);
  534. #else
  535.  
  536. int32_t fReadCount = 0;
  537. sp<ABuffer> buf = new ABuffer(kMaxUDPSize*MAX_READ_COUNT);
  538. uint8_t *fTmpBuffer = buf->base();
  539. uint8_t arHeader[RTP_HEADER_LEN] = {0};
  540. uint8_t rtpHeader[RTP_HEADER_LEN] = {0};
  541. ssize_t fFrameSize = 0;
  542. uint16_t fSeqNo = 0;
  543. bool fMbitIs1 = false;
  544.  
  545. struct sockaddr_in remoteAddr;
  546. socklen_t remoteAddrLen = sizeof(remoteAddr);
  547.  
  548. do {
  549. ssize_t n;
  550. do {
  551. n = recvfrom(
  552. mSocket, fTmpBuffer, kMaxUDPSize, 0,
  553. (struct sockaddr *)&remoteAddr, &remoteAddrLen);
  554.  
  555. } while (n < 0 && errno == EINTR);
  556.  
  557. err = OK;
  558. if (n < 0) {
  559. err = -errno;
  560. } else if (n == 0) {
  561. err = -ECONNRESET;
  562. }else {
  563. ///M : Add for pause RTP data to sink @{
  564. if (mRTPPause){
  565. err = OK;
  566. fHaveSeenFirstPacket = false;
  567. break;
  568. }
  569. ///@ }
  570. //ALOGD("Receive size is %d",n);
  571. fSeqNo = U16_AT(&fTmpBuffer[2]);
  572.  
  573. fMbitIs1 = false;
  574. //Mbit is flag for last packet of frame
  575. if((fTmpBuffer[1] >> 7) & 0x1)
  576. {
  577. fMbitIs1 = true;
  578. }
  579.  
  580. if (!fHaveSeenFirstPacket)
  581. {
  582. fNextExpectedSeqNo = fSeqNo + 1;
  583. fHaveSeenFirstPacket = true;
  584. }
  585. else
  586. {
  587. if (fNextExpectedSeqNo != fSeqNo)
  588. {
  589. uint32_t diff = (fSeqNo - fNextExpectedSeqNo + USHORT_MAX - 1) % (USHORT_MAX -1);
  590. ///M : drop gap is large or old packet
  591. if(diff >60000)
  592. {
  593. ALOGI("Wrong order, Recv SeqNo = %u, Expect SeqNo = %u\n",
  594. fSeqNo, fNextExpectedSeqNo);
  595. continue;
  596. }
  597.  
  598. ALOGI("Recv SeqNo = %u, Expect SeqNo = %u, Packet lost = %d\n",
  599. fSeqNo, fNextExpectedSeqNo, diff);
  600.  
  601.  
  602. }
  603. fNextExpectedSeqNo = fSeqNo + 1;
  604. }
  605.  
  606. ++fReadCount;
  607. if(fReadCount == 1)
  608. {
  609. *(rtp_header_len_s*)rtpHeader = *(rtp_header_len_s*)fTmpBuffer;
  610. }
  611. *(rtp_header_len_s*)fTmpBuffer = *(rtp_header_len_s*)arHeader;
  612. fTmpBuffer += (n - RTP_HEADER_LEN);
  613. fFrameSize += (n - RTP_HEADER_LEN);
  614. *(rtp_header_len_s*)arHeader = *(rtp_header_len_s*)fTmpBuffer;
  615.  
  616.  
  617. }
  618. } while (err == OK && fReadCount < MAX_READ_COUNT && !fMbitIs1 );
  619.  
  620. //ALOGD("Receive %d Packet, size is %d",fReadCount,fFrameSize+12);
  621.  
  622. if(fReadCount >= 1)
  623. {
  624. fTmpBuffer -= fFrameSize;
  625. /// Add rtp header
  626. *(rtp_header_len_s*)fTmpBuffer = *(rtp_header_len_s*)rtpHeader;
  627.  
  628. buf->setRange(0,fFrameSize+12);
  629. int64_t nowUs = ALooper::GetNowUs();
  630. buf->meta()->setInt64("arrivalTimeUs", nowUs);
  631.  
  632. ///M: Add for Latency issue, print time log
  633. if(mNextPrintTimeUs < 0ll || nowUs >= mNextPrintTimeUs)
  634. {
  635. buf->meta()->setInt32("printTime",1);
  636. mNextPrintTimeUs = nowUs + kPrintIntervalUs;
  637. }
  638. else
  639. {
  640. buf->meta()->setInt32("printTime",0);
  641. }
  642. if(fMbitIs1)
  643. {
  644. buf->meta()->setInt32("Mbit",1);
  645. }
  646. else
  647. {
  648. buf->meta()->setInt32("Mbit",0);
  649. }
  650.  
  651.  
  652. sp<AMessage> notify = mNotify->dup();
  653. notify->setInt32("sessionID", mSessionID);
  654. notify->setInt32("reason", kWhatDatagram);
  655.  
  656. uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
  657. notify->setString(
  658. "fromAddr",
  659. StringPrintf(
  660. "%u.%u.%u.%u",
  661. ip >> 24,
  662. (ip >> 16) & 0xff,
  663. (ip >> 8) & 0xff,
  664. ip & 0xff).c_str());
  665.  
  666. notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
  667.  
  668. notify->setBuffer("data", buf);
  669. notify->post();
  670. //Count RTP number received
  671. mRTPCounter ++;
  672.  
  673. }
  674.  
  675. #endif
  676.  
  677. /// @}
  678. if (err == -EAGAIN) {
  679. err = OK;
  680. }
  681.  
  682. if (err != OK) {
  683. if (!mUDPRetries) {
  684. notifyError(false /* send */, err, "Recvfrom failed.");
  685. mSawReceiveFailure = true;
  686. } else {
  687. mUDPRetries--;
  688. ALOGE("Recvfrom failed, %d/%d retries left",
  689. mUDPRetries, kMaxUDPRetries);
  690. err = OK;
  691. }
  692. } else {
  693. mUDPRetries = kMaxUDPRetries;
  694. }
  695.  
  696. return err;
  697. }
  698.  
  699. ///Modify by MTK @{
  700. char tmp[530]; // for AKE_Send_Cert len = 524 bytes + AKE_Receiver_info = 6 bytes
  701. ///
  702. //char tmp[512];
  703. ssize_t n;
  704. do {
  705. n = recv(mSocket, tmp, sizeof(tmp), 0);
  706. } while (n < 0 && errno == EINTR);
  707.  
  708. status_t err = OK;
  709.  
  710. if (n > 0) {
  711. mInBuffer.append(tmp, n);
  712.  
  713. ///Add by MTK @{
  714. if (mTcpType == Session::TCP_BINDATA)
  715. {
  716. char v[PROPERTY_VALUE_MAX];
  717. if (property_get("media.stagefright_wfd.hdcp.dump", v, NULL)
  718. && (!strcmp(v, "1") ))
  719. {
  720. ALOGD("in:");
  721. hexdump(tmp, n);
  722. }
  723. }
  724. ///@}
  725. } else if (n < 0) {
  726. err = -errno;
  727. } else {
  728. err = -ECONNRESET;
  729. }
  730.  
  731. if (mMode == MODE_DATAGRAM) {
  732. // TCP stream carrying 16-bit length-prefixed datagrams.
  733. ///Add by MTK @{
  734. if(mTcpType == Session::TCP_DATAGRAM){
  735. ///@}
  736. while (mInBuffer.size() >= 2) {
  737. size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
  738.  
  739. if (mInBuffer.size() < packetSize + 2) {
  740. break;
  741. }
  742.  
  743. sp<ABuffer> packet = new ABuffer(packetSize);
  744. memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
  745.  
  746. int64_t nowUs = ALooper::GetNowUs();
  747. packet->meta()->setInt64("arrivalTimeUs", nowUs);
  748.  
  749. sp<AMessage> notify = mNotify->dup();
  750. notify->setInt32("sessionID", mSessionID);
  751. notify->setInt32("reason", kWhatDatagram);
  752. notify->setBuffer("data", packet);
  753. notify->post();
  754.  
  755. mInBuffer.erase(0, packetSize + 2);
  756. }
  757. }
  758. ///Add by MTK @{
  759. else if(mTcpType == Session::TCP_BINDATA)
  760. {
  761. sp<ABuffer> packet = new ABuffer(mInBuffer.size());
  762. memcpy(packet->data(), mInBuffer.c_str(), mInBuffer.size());
  763.  
  764. sp<AMessage> notify = mNotify->dup();
  765. notify->setInt32("sessionID", mSessionID);
  766. notify->setInt32("reason", kWhatBinaryData);
  767. notify->setBuffer("data", packet);
  768. notify->post();
  769. mInBuffer.clear();
  770. }
  771. ///@}
  772. else if(mTcpType == Session::TCP_TEXTDATA){
  773. sp<AMessage> notify = mNotify->dup();
  774.  
  775. notify->setInt32("sessionID", mSessionID);
  776. notify->setInt32("reason", kWhatTextData);
  777. notify->setString("data", mInBuffer.c_str());
  778. notify->post();
  779.  
  780. mInBuffer.clear();
  781. ///Add by MTK @{
  782. }else if(mTcpType == Session::TCP_UIBC){
  783. ALOGD("TCP_UIBC");
  784. while (mInBuffer.size() >= 4) {
  785. ALOGD("UIBC remain buffer size:%d", mInBuffer.size());
  786. const char* pBuffer = mInBuffer.c_str();
  787. ALOGD("Buffer:0x%02x:0x%02x:0x%02x:0x%02x", pBuffer[0], pBuffer[1], pBuffer[2], pBuffer[3]);
  788. size_t packetSize = (pBuffer[2]<<8) + pBuffer[3];
  789. if(packetSize<=0) {
  790. ALOGD("packet size error :%d", packetSize);
  791. mInBuffer.clear();
  792. break;
  793. }
  794. if (mInBuffer.size() < packetSize) {
  795. break;
  796. } else {
  797. sp<ABuffer> packet = new ABuffer(packetSize);
  798. memcpy(packet->data(), mInBuffer.c_str(), packetSize);
  799.  
  800. sp<AMessage> notify = mNotify->dup();
  801. notify->setInt32("sessionID", mSessionID);
  802. notify->setInt32("reason", kWhatUibcData);
  803. notify->setBuffer("data", packet);
  804. notify->post();
  805.  
  806. mInBuffer.erase(0, packetSize);
  807. }
  808. }
  809. }
  810. ///@}
  811. } else if (mMode == MODE_RTSP) {
  812. for (;;) {
  813. size_t length;
  814.  
  815. if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
  816. if (mInBuffer.size() < 4) {
  817. break;
  818. }
  819.  
  820. length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
  821.  
  822. if (mInBuffer.size() < 4 + length) {
  823. break;
  824. }
  825.  
  826. sp<AMessage> notify = mNotify->dup();
  827. notify->setInt32("sessionID", mSessionID);
  828. notify->setInt32("reason", kWhatBinaryData);
  829. notify->setInt32("channel", mInBuffer.c_str()[1]);
  830.  
  831. sp<ABuffer> data = new ABuffer(length);
  832. memcpy(data->data(), mInBuffer.c_str() + 4, length);
  833.  
  834. int64_t nowUs = ALooper::GetNowUs();
  835. data->meta()->setInt64("arrivalTimeUs", nowUs);
  836.  
  837. notify->setBuffer("data", data);
  838. notify->post();
  839.  
  840. mInBuffer.erase(0, 4 + length);
  841. continue;
  842. }
  843.  
  844. sp<ParsedMessage> msg =
  845. ParsedMessage::Parse(
  846. mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
  847.  
  848. if (msg == NULL) {
  849. break;
  850. }
  851.  
  852. sp<AMessage> notify = mNotify->dup();
  853. notify->setInt32("sessionID", mSessionID);
  854. notify->setInt32("reason", kWhatData);
  855. notify->setObject("data", msg);
  856. notify->post();
  857.  
  858. #if 1
  859. // XXX The (old) dongle sends the wrong content length header on a
  860. // SET_PARAMETER request that signals a "wfd_idr_request".
  861. // (17 instead of 19).
  862. const char *content = msg->getContent();
  863. if (content
  864. && !memcmp(content, "wfd_idr_request\r\n", 17)
  865. && length >= 19
  866. && mInBuffer.c_str()[length] == '\r'
  867. && mInBuffer.c_str()[length + 1] == '\n') {
  868. length += 2;
  869. }
  870. #endif
  871.  
  872. mInBuffer.erase(0, length);
  873.  
  874. if (err != OK) {
  875. break;
  876. }
  877. }
  878. } else {
  879. CHECK_EQ(mMode, MODE_WEBSOCKET);
  880.  
  881. const uint8_t *data = (const uint8_t *)mInBuffer.c_str();
  882. // hexdump(data, mInBuffer.size());
  883.  
  884. while (mInBuffer.size() >= 2) {
  885. size_t offset = 2;
  886.  
  887. unsigned payloadLen = data[1] & 0x7f;
  888. if (payloadLen == 126) {
  889. if (offset + 2 > mInBuffer.size()) {
  890. break;
  891. }
  892.  
  893. payloadLen = U16_AT(&data[offset]);
  894. offset += 2;
  895. } else if (payloadLen == 127) {
  896. if (offset + 8 > mInBuffer.size()) {
  897. break;
  898. }
  899.  
  900. payloadLen = U64_AT(&data[offset]);
  901. offset += 8;
  902. }
  903.  
  904. uint32_t mask = 0;
  905. if (data[1] & 0x80) {
  906. // MASK==1
  907. if (offset + 4 > mInBuffer.size()) {
  908. break;
  909. }
  910.  
  911. mask = U32_AT(&data[offset]);
  912. offset += 4;
  913. }
  914.  
  915. if (offset + payloadLen > mInBuffer.size()) {
  916. break;
  917. }
  918.  
  919. // We have the full message.
  920.  
  921. sp<ABuffer> packet = new ABuffer(payloadLen);
  922. memcpy(packet->data(), &data[offset], payloadLen);
  923.  
  924. if (mask != 0) {
  925. for (size_t i = 0; i < payloadLen; ++i) {
  926. packet->data()[i] =
  927. data[offset + i]
  928. ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
  929. }
  930. }
  931.  
  932. sp<AMessage> notify = mNotify->dup();
  933. notify->setInt32("sessionID", mSessionID);
  934. notify->setInt32("reason", kWhatWebSocketMessage);
  935. notify->setBuffer("data", packet);
  936. notify->setInt32("headerByte", data[0]);
  937. notify->post();
  938.  
  939. mInBuffer.erase(0, offset + payloadLen);
  940. }
  941. }
  942.  
  943. if (err != OK) {
  944. notifyError(false /* send */, err, "Recv failed.");
  945. mSawReceiveFailure = true;
  946. }
  947.  
  948. return err;
  949. }
  950.  
  951. void ANetworkSession::Session::dumpFragmentStats(const Fragment &frag) {
  952. #if 0
  953. int64_t nowUs = ALooper::GetNowUs();
  954. int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
  955.  
  956. static const int64_t kMinDelayMs = 0;
  957. static const int64_t kMaxDelayMs = 300;
  958.  
  959. const char *kPattern = "########################################";
  960. size_t kPatternSize = strlen(kPattern);
  961.  
  962. int n = (kPatternSize * (delayMs - kMinDelayMs))
  963. / (kMaxDelayMs - kMinDelayMs);
  964.  
  965. if (n < 0) {
  966. n = 0;
  967. } else if ((size_t)n > kPatternSize) {
  968. n = kPatternSize;
  969. }
  970.  
  971. ALOGI("[%lld]: (%4lld ms) %s\n",
  972. frag.mTimeUs / 1000,
  973. delayMs,
  974. kPattern + kPatternSize - n);
  975. #endif
  976.  
  977. #ifdef MTK_AOSP_ENHANCEMENT
  978. int64_t nowUs = ALooper::GetNowUs();
  979. if( frag.mIsWFDPacket ) {
  980. ALOGI("[WFD_P][%s][dummy=%d]ts=%lld ms,in %lld ms,out %lld ms, mLatencyF %lld ms,send cost %lld ms,LatencyT %lld ms",
  981. ( frag.mIsVideo?"video":"audo"), frag.mIsDummyVideo,
  982. frag.mTimeUs/1000ll,frag.mInTimeUs/1000ll,nowUs/1000ll ,
  983. (frag.mLatencyBeginProfileMs > 0ll ?(frag.mInTimeUs/1000ll - frag.mLatencyBeginProfileMs):-1),
  984. ((nowUs - frag.mInTimeUs)/1000ll),
  985. (frag.mLatencyBeginProfileMs > 0ll ? nowUs/1000ll - frag.mLatencyBeginProfileMs:-1));
  986. if(frag.mIsDummyVideo){
  987. #ifdef MTB_SUPPORT
  988. //ATRACE_ASYNC_END("SND-WIFI", frag.mLatencyToken);
  989.  
  990. #endif
  991. }
  992. }
  993. #else
  994. /* Fix build warining */
  995. Fragment dummy = frag;
  996. #endif
  997.  
  998. }
  999. status_t ANetworkSession::Session::writeMore() {
  1000. if (mState == DATAGRAM) {
  1001. CHECK(!mOutFragments.empty());
  1002.  
  1003. status_t err;
  1004. do {
  1005. const Fragment &frag = *mOutFragments.begin();
  1006. const sp<ABuffer> &datagram = frag.mBuffer;
  1007.  
  1008. int n;
  1009.  
  1010. #ifdef MTK_AOSP_ENHANCEMENT
  1011. int64_t startTimeUs = ALooper::GetNowUs();
  1012. #ifdef MTB_SUPPORT
  1013. if(frag.mIsDummyVideo && frag.mIsFirst){
  1014. ATRACE_ASYNC_BEGIN("SND-SKT", frag.mLatencyToken);
  1015. //ATRACE_INT("SND-SKT:SeqNo", frag.mRtpSeqNumber);
  1016. uint16_t RtpSeq16Bit = frag.mRtpSeqNumber & 0xFFFFU;
  1017. ATRACE_INT("SND-SKT:SeqNo", RtpSeq16Bit);
  1018. }
  1019.  
  1020. if(frag.mIsDummyVideo && frag.mIsLast){
  1021. ATRACE_ASYNC_END("SND-SKT", frag.mLatencyToken);
  1022. uint16_t RtpSeq16Bit = frag.mRtpSeqNumber & 0xFFFFU;
  1023. //ATRACE_INT("SND-SKT:SeqNo", frag.mRtpSeqNumber);
  1024. ATRACE_INT("SND-SKT:SeqNo", RtpSeq16Bit);
  1025. }
  1026. #endif
  1027. #endif
  1028.  
  1029. do {
  1030. n = send(mSocket, datagram->data(), datagram->size(), 0);
  1031. } while (n < 0 && errno == EINTR);
  1032.  
  1033. #ifdef MTK_AOSP_ENHANCEMENT
  1034. int64_t endTimeUs = ALooper::GetNowUs();
  1035.  
  1036. if(endTimeUs - startTimeUs > 2000ll){
  1037. ALOGI("[Latency]Send a datagram more than %lld ms,left %d frags",
  1038. (endTimeUs - startTimeUs)/1000,mOutFragments.size());
  1039. }
  1040.  
  1041. #endif
  1042. err = OK;
  1043.  
  1044. if (n > 0) {
  1045. if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
  1046. dumpFragmentStats(frag);
  1047. }
  1048.  
  1049. mOutFragments.erase(mOutFragments.begin());
  1050. } else if (n < 0) {
  1051. err = -errno;
  1052. } else if (n == 0) {
  1053. err = -ECONNRESET;
  1054. }
  1055. } while (err == OK && !mOutFragments.empty());
  1056.  
  1057. if (err == -EAGAIN) {
  1058. if (!mOutFragments.empty()) {
  1059.  
  1060. int numBytesQueued;
  1061.  
  1062. // res = 0: query succeed
  1063. int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
  1064.  
  1065. ALOGI("[Latency]%d datagrams remain queued. BytesInSocket = %d/%d", mOutFragments.size(), numBytesQueued, res);
  1066.  
  1067. }
  1068. err = OK;
  1069. }
  1070.  
  1071. if (err != OK) {
  1072. if (!mUDPRetries) {
  1073. notifyError(true /* send */, err, "Send datagram failed.");
  1074. mSawSendFailure = true;
  1075. } else {
  1076. mUDPRetries--;
  1077. ALOGE("Send datagram failed, %d/%d retries left",
  1078. mUDPRetries, kMaxUDPRetries);
  1079. err = OK;
  1080. }
  1081. } else {
  1082. mUDPRetries = kMaxUDPRetries;
  1083. }
  1084.  
  1085. return err;
  1086. }
  1087.  
  1088. if (mState == CONNECTING) {
  1089. int err;
  1090. socklen_t optionLen = sizeof(err);
  1091. CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, (socklen_t*)&optionLen), 0);
  1092. CHECK_EQ(optionLen, (socklen_t)sizeof(err));
  1093.  
  1094. if (err != 0) {
  1095. notifyError(kWhatError, -err, "Connection failed");
  1096. mSawSendFailure = true;
  1097.  
  1098. return -err;
  1099. }
  1100.  
  1101. mState = CONNECTED;
  1102. notify(kWhatConnected);
  1103.  
  1104. return OK;
  1105. }
  1106.  
  1107. CHECK_EQ(mState, CONNECTED);
  1108. CHECK(!mOutFragments.empty());
  1109.  
  1110. ssize_t n = -1;
  1111. while (!mOutFragments.empty()) {
  1112. const Fragment &frag = *mOutFragments.begin();
  1113.  
  1114. do {
  1115. n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
  1116. } while (n < 0 && errno == EINTR);
  1117.  
  1118. if (n <= 0) {
  1119. break;
  1120. }
  1121.  
  1122. frag.mBuffer->setRange(
  1123. frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
  1124.  
  1125. if (frag.mBuffer->size() > 0) {
  1126. break;
  1127. }
  1128.  
  1129. if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
  1130. dumpFragmentStats(frag);
  1131. }
  1132.  
  1133. mOutFragments.erase(mOutFragments.begin());
  1134. }
  1135.  
  1136. status_t err = OK;
  1137.  
  1138. if (n < 0) {
  1139. err = -errno;
  1140. } else if (n == 0) {
  1141. err = -ECONNRESET;
  1142. }
  1143.  
  1144. if (err != OK) {
  1145. notifyError(true /* send */, err, "Send failed.");
  1146. mSawSendFailure = true;
  1147. }
  1148.  
  1149. #if 0
  1150. int numBytesQueued;
  1151. int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
  1152. if (res == 0 && numBytesQueued > 50 * 1024) {
  1153. if (numBytesQueued > 409600) {
  1154. ALOGW("!!! numBytesQueued = %d", numBytesQueued);
  1155. }
  1156.  
  1157. int64_t nowUs = ALooper::GetNowUs();
  1158.  
  1159. if (mLastStallReportUs < 0ll
  1160. || nowUs > mLastStallReportUs + 100000ll) {
  1161. sp<AMessage> msg = mNotify->dup();
  1162. msg->setInt32("sessionID", mSessionID);
  1163. msg->setInt32("reason", kWhatNetworkStall);
  1164. msg->setSize("numBytesQueued", numBytesQueued);
  1165. msg->post();
  1166.  
  1167. mLastStallReportUs = nowUs;
  1168. }
  1169. }
  1170. #endif
  1171.  
  1172. return err;
  1173. }
  1174.  
  1175. #ifdef MTK_AOSP_ENHANCEMENT
  1176. status_t ANetworkSession::Session::sendWFDRequest(
  1177. List<sp<ABuffer> > &packets, int64_t timeUs ){
  1178.  
  1179. CHECK(mState == CONNECTED || mState == DATAGRAM);
  1180.  
  1181. while(!packets.empty()){
  1182.  
  1183. const sp<ABuffer> &packet = *(packets.begin());
  1184. const void *data = packet->data();
  1185. int32_t size = packet->size();
  1186. int32_t isLastPacket = 0;
  1187. int32_t isFirstPacket = 0;
  1188. int64_t latencyB = -1;
  1189. int64_t LatencyToken = -1;
  1190. int32_t isVideo = 0;
  1191. int32_t isDummy = 0;
  1192. packet->meta()->findInt32("isLast", &isLastPacket);
  1193. packet->meta()->findInt32("isFirst", &isFirstPacket);
  1194. packet->meta()->findInt32("isVideo", &isVideo);
  1195. packet->meta()->findInt64("latencyB", &latencyB);
  1196. packet->meta()->findInt64("LatencyToken", &LatencyToken);
  1197. packet->meta()->findInt32("isDummy", &isDummy);
  1198.  
  1199.  
  1200.  
  1201. if (size < 0) {
  1202. size = strlen((const char *)data);
  1203. }
  1204.  
  1205. if (size == 0) {
  1206. continue;
  1207. }
  1208. sp<ABuffer> buffer;
  1209. if (mState == CONNECTED && mMode == MODE_DATAGRAM) {
  1210. ///Add by MTK @{
  1211. if (mTcpType == TCP_DATAGRAM){
  1212. ///@}
  1213. CHECK_LE(size, 65535);
  1214.  
  1215. buffer = new ABuffer(size + 2);
  1216. buffer->data()[0] = size >> 8;
  1217. buffer->data()[1] = size & 0xff;
  1218. memcpy(buffer->data() + 2, data, size);
  1219. }
  1220. else if (mTcpType == TCP_BINDATA) {
  1221. buffer = new ABuffer(size);
  1222. memcpy(buffer->data(), data, size);
  1223. }
  1224. } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) {
  1225. static const bool kUseMask = false; // Chromium doesn't like it.
  1226.  
  1227. size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0);
  1228. if (size > 65535) {
  1229. numHeaderBytes += 8;
  1230. } else if (size > 125) {
  1231. numHeaderBytes += 2;
  1232. }
  1233.  
  1234. buffer = new ABuffer(numHeaderBytes + size);
  1235. buffer->data()[0] = 0x81; // FIN==1 | opcode=1 (text)
  1236. buffer->data()[1] = kUseMask ? 0x80 : 0x00;
  1237.  
  1238. if (size > 65535) {
  1239. buffer->data()[1] |= 127;
  1240. buffer->data()[2] = 0x00;
  1241. buffer->data()[3] = 0x00;
  1242. buffer->data()[4] = 0x00;
  1243. buffer->data()[5] = 0x00;
  1244. buffer->data()[6] = (size >> 24) & 0xff;
  1245. buffer->data()[7] = (size >> 16) & 0xff;
  1246. buffer->data()[8] = (size >> 8) & 0xff;
  1247. buffer->data()[9] = size & 0xff;
  1248. } else if (size > 125) {
  1249. buffer->data()[1] |= 126;
  1250. buffer->data()[2] = (size >> 8) & 0xff;
  1251. buffer->data()[3] = size & 0xff;
  1252. } else {
  1253. buffer->data()[1] |= size;
  1254. }
  1255.  
  1256. if (kUseMask) {
  1257. uint32_t mask = rand();
  1258.  
  1259. buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff;
  1260. buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff;
  1261. buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff;
  1262. buffer->data()[numHeaderBytes - 1] = mask & 0xff;
  1263.  
  1264. for (size_t i = 0; i < (size_t)size; ++i) {
  1265. buffer->data()[numHeaderBytes + i] =
  1266. ((const uint8_t *)data)[i]
  1267. ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
  1268. }
  1269. } else {
  1270. memcpy(buffer->data() + numHeaderBytes, data, size);
  1271. }
  1272. } else {
  1273. buffer = new ABuffer(size);
  1274. memcpy(buffer->data(), data, size);
  1275. }
  1276.  
  1277. Fragment frag;
  1278.  
  1279. frag.mFlags = 0;
  1280. frag.mBuffer = buffer;
  1281. frag.mTimeUs = timeUs;
  1282. frag.mIsVideo = isVideo;
  1283. frag.mIsDummyVideo = isDummy;
  1284. frag.mIsWFDPacket = true;
  1285. frag.mRtpSeqNumber = packet->int32Data();
  1286. if(isFirstPacket == 1){
  1287. frag.mIsFirst = true;
  1288. }
  1289. if(isLastPacket == 1){
  1290. frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
  1291. frag.mInTimeUs = ALooper::GetNowUs();
  1292. frag.mLatencyBeginProfileMs = latencyB;
  1293. frag.mLatencyToken = LatencyToken;
  1294. frag.mIsLast = true;
  1295. if(isDummy){
  1296. #ifdef MTB_SUPPORT
  1297. ATRACE_ASYNC_END("VENC-SND", frag.mLatencyToken);
  1298. //ATRACE_ASYNC_BEGIN("SND-WIFI", frag.mLatencyToken);
  1299. #endif
  1300. }
  1301. }
  1302. mOutFragments.push_back(frag);
  1303. packets.erase(packets.begin());
  1304.  
  1305. }
  1306.  
  1307. return OK;
  1308. }
  1309.  
  1310.  
  1311. #endif
  1312.  
  1313. status_t ANetworkSession::Session::sendRequest(
  1314. const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
  1315. CHECK(mState == CONNECTED || mState == DATAGRAM);
  1316.  
  1317. if (size < 0) {
  1318. size = strlen((const char *)data);
  1319. }
  1320.  
  1321. if (size == 0) {
  1322. return OK;
  1323. }
  1324.  
  1325. sp<ABuffer> buffer;
  1326. if (mState == CONNECTED && mMode == MODE_DATAGRAM) {
  1327. ///Add by MTK @{
  1328. if (mTcpType == TCP_DATAGRAM){
  1329. ///@}
  1330. CHECK_LE(size, 65535);
  1331.  
  1332. buffer = new ABuffer(size + 2);
  1333. buffer->data()[0] = size >> 8;
  1334. buffer->data()[1] = size & 0xff;
  1335. memcpy(buffer->data() + 2, data, size);
  1336. }
  1337. else if (mTcpType == TCP_BINDATA) {
  1338. buffer = new ABuffer(size);
  1339. memcpy(buffer->data(), data, size);
  1340. }
  1341. } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) {
  1342. static const bool kUseMask = false; // Chromium doesn't like it.
  1343.  
  1344. size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0);
  1345. if (size > 65535) {
  1346. numHeaderBytes += 8;
  1347. } else if (size > 125) {
  1348. numHeaderBytes += 2;
  1349. }
  1350.  
  1351. buffer = new ABuffer(numHeaderBytes + size);
  1352. buffer->data()[0] = 0x81; // FIN==1 | opcode=1 (text)
  1353. buffer->data()[1] = kUseMask ? 0x80 : 0x00;
  1354.  
  1355. if (size > 65535) {
  1356. buffer->data()[1] |= 127;
  1357. buffer->data()[2] = 0x00;
  1358. buffer->data()[3] = 0x00;
  1359. buffer->data()[4] = 0x00;
  1360. buffer->data()[5] = 0x00;
  1361. buffer->data()[6] = (size >> 24) & 0xff;
  1362. buffer->data()[7] = (size >> 16) & 0xff;
  1363. buffer->data()[8] = (size >> 8) & 0xff;
  1364. buffer->data()[9] = size & 0xff;
  1365. } else if (size > 125) {
  1366. buffer->data()[1] |= 126;
  1367. buffer->data()[2] = (size >> 8) & 0xff;
  1368. buffer->data()[3] = size & 0xff;
  1369. } else {
  1370. buffer->data()[1] |= size;
  1371. }
  1372.  
  1373. if (kUseMask) {
  1374. uint32_t mask = rand();
  1375.  
  1376. buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff;
  1377. buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff;
  1378. buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff;
  1379. buffer->data()[numHeaderBytes - 1] = mask & 0xff;
  1380.  
  1381. for (size_t i = 0; i < (size_t)size; ++i) {
  1382. buffer->data()[numHeaderBytes + i] =
  1383. ((const uint8_t *)data)[i]
  1384. ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
  1385. }
  1386. } else {
  1387. memcpy(buffer->data() + numHeaderBytes, data, size);
  1388. }
  1389. } else {
  1390. buffer = new ABuffer(size);
  1391. memcpy(buffer->data(), data, size);
  1392. }
  1393.  
  1394. Fragment frag;
  1395.  
  1396. frag.mFlags = 0;
  1397. if (timeValid) {
  1398. frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
  1399. frag.mTimeUs = timeUs;
  1400. #ifdef MTK_AOSP_ENHANCEMENT
  1401. frag.mInTimeUs = ALooper::GetNowUs();
  1402. #endif
  1403. }
  1404.  
  1405. frag.mBuffer = buffer;
  1406.  
  1407. mOutFragments.push_back(frag);
  1408.  
  1409. return OK;
  1410. }
  1411.  
  1412. status_t ANetworkSession::Session::writeDirectRequest(const void *data_in, ssize_t size) {
  1413. CHECK(mState == CONNECTED || mState == DATAGRAM || mState == SOCKET_ERROR);
  1414.  
  1415. if (mState == DATAGRAM || mState == CONNECTED) {
  1416. CHECK_GE(size, 0);
  1417.  
  1418. ssize_t n;
  1419. status_t err = OK;
  1420. int retry = 0;
  1421.  
  1422. uint8_t *data = (uint8_t*) data_in;
  1423. /*
  1424. if (data[0] == 0x80 && (data[1] & 0x7f) == 33) {
  1425. int64_t nowUs = ALooper::GetNowUs();
  1426.  
  1427. uint32_t prevRtpTime = U32_AT(&data[4]);
  1428.  
  1429. // 90kHz time scale
  1430. uint32_t rtpTime = (nowUs * 9ll) / 100ll;
  1431.  
  1432. //ALOGV("correcting rtpTime by %.0f ms", diffTime / 90.0);
  1433.  
  1434. data[4] = rtpTime >> 24;
  1435. data[5] = (rtpTime >> 16) & 0xff;
  1436. data[6] = (rtpTime >> 8) & 0xff;
  1437. data[7] = rtpTime & 0xff;
  1438. }
  1439. */
  1440.  
  1441. int numBytesQueued;
  1442. int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
  1443.  
  1444.  
  1445. if (res == 0 && numBytesQueued > mThresholdCount * 1024) {
  1446. if (numBytesQueued > WARN_BUFFER_SIZE * 1024) {
  1447. ALOGW("!!! numBytesQueued = %d", numBytesQueued);
  1448. }
  1449.  
  1450. int64_t nowUs = ALooper::GetNowUs();
  1451.  
  1452. if (mLastStallReportUs < 0ll
  1453. || nowUs > mLastStallReportUs + 100000ll) {
  1454. sp<AMessage> msg = mNotify->dup();
  1455. msg->setInt32("sessionID", mSessionID);
  1456. msg->setInt32("reason", kWhatNetworkStall);
  1457. msg->setSize("numBytesQueued", numBytesQueued);
  1458. msg->post();
  1459.  
  1460. mLastStallReportUs = nowUs;
  1461. }
  1462. }
  1463.  
  1464. do {
  1465. n = send(mSocket, data, size, 0);
  1466. retry++;
  1467. if((retry % 10) == 0){
  1468. ALOGI("retry:%d", retry);
  1469. if(retry > 10 * 1000){
  1470. ALOGE("Fail to send");
  1471. break;
  1472. }
  1473. usleep(10 * 1000);
  1474. }
  1475. } while (n < 0 && (errno == EINTR || errno == EAGAIN || errno == ECONNREFUSED));
  1476.  
  1477. if (n < 0) {
  1478. err = -errno;
  1479. } else if (n == 0) {
  1480. err = -ECONNRESET;
  1481. }
  1482.  
  1483. if (err != OK) {
  1484. if(mState == DATAGRAM){
  1485. notifyError(true /* send */, err, "Send datagram failed.");
  1486. }else if(mState == CONNECTED){
  1487. notifyError(true /* send */, err, "Send failed.");
  1488. }
  1489. mState = SOCKET_ERROR;
  1490. mSawSendFailure = true;
  1491. }
  1492.  
  1493. return err;
  1494. }
  1495.  
  1496. return OK;
  1497. }
  1498. void ANetworkSession::Session::notifyError(
  1499. bool send, status_t err, const char *detail) {
  1500. sp<AMessage> msg = mNotify->dup();
  1501. msg->setInt32("sessionID", mSessionID);
  1502. msg->setInt32("reason", kWhatError);
  1503. msg->setInt32("send", send);
  1504. msg->setInt32("err", err);
  1505. msg->setString("detail", detail);
  1506. msg->post();
  1507. }
  1508.  
  1509. void ANetworkSession::Session::notify(NotificationReason reason) {
  1510. sp<AMessage> msg = mNotify->dup();
  1511. msg->setInt32("sessionID", mSessionID);
  1512. msg->setInt32("reason", reason);
  1513. msg->post();
  1514. }
  1515.  
  1516. ////////////////////////////////////////////////////////////////////////////////
  1517. ANetworkSession::ANetworkSession()
  1518. : mNextSessionID(1){
  1519. mPipeFd[0] = mPipeFd[1] = -1;
  1520. mTestMode = false;
  1521. }
  1522.  
  1523. ANetworkSession::~ANetworkSession() {
  1524. stop();
  1525. }
  1526.  
  1527. status_t ANetworkSession::start() {
  1528. if (mThread != NULL) {
  1529. return INVALID_OPERATION;
  1530. }
  1531.  
  1532. int res = pipe(mPipeFd);
  1533. if (res != 0) {
  1534. mPipeFd[0] = mPipeFd[1] = -1;
  1535. return -errno;
  1536. }
  1537.  
  1538. mThread = new NetworkThread(this);
  1539.  
  1540. status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
  1541.  
  1542. if (err != OK) {
  1543. mThread.clear();
  1544.  
  1545. close(mPipeFd[0]);
  1546. close(mPipeFd[1]);
  1547. mPipeFd[0] = mPipeFd[1] = -1;
  1548.  
  1549. return err;
  1550. }
  1551. return OK;
  1552. }
  1553.  
  1554. status_t ANetworkSession::stop() {
  1555. if (mThread == NULL) {
  1556. return INVALID_OPERATION;
  1557. }
  1558.  
  1559. mThread->requestExit();
  1560. interrupt();
  1561. mThread->requestExitAndWait();
  1562.  
  1563. mThread.clear();
  1564.  
  1565. close(mPipeFd[0]);
  1566. close(mPipeFd[1]);
  1567. mPipeFd[0] = mPipeFd[1] = -1;
  1568. return OK;
  1569. }
  1570.  
  1571. status_t ANetworkSession::createRTSPClient(
  1572. const char *host, unsigned port, const sp<AMessage> &notify,
  1573. int32_t *sessionID) {
  1574. return createClientOrServer(
  1575. kModeCreateRTSPClient,
  1576. NULL /* addr */,
  1577. 0 /* port */,
  1578. host,
  1579. port,
  1580. notify,
  1581. sessionID);
  1582. }
  1583.  
  1584. status_t ANetworkSession::createRTSPServer(
  1585. const struct in_addr &addr, unsigned port,
  1586. const sp<AMessage> &notify, int32_t *sessionID) {
  1587. return createClientOrServer(
  1588. kModeCreateRTSPServer,
  1589. &addr,
  1590. port,
  1591. NULL /* remoteHost */,
  1592. 0 /* remotePort */,
  1593. notify,
  1594. sessionID);
  1595. }
  1596.  
  1597. status_t ANetworkSession::createUDPSession(
  1598. unsigned localPort,
  1599. const sp<AMessage> &notify,
  1600. int32_t *sessionID) {
  1601. return createUDPSession(localPort, NULL, 0, notify, sessionID);
  1602. }
  1603.  
  1604. status_t ANetworkSession::createUDPSession(
  1605. unsigned localPort,
  1606. const char *remoteHost,
  1607. unsigned remotePort,
  1608. const sp<AMessage> &notify,
  1609. int32_t *sessionID) {
  1610. return createClientOrServer(
  1611. kModeCreateUDPSession,
  1612. NULL /* addr */,
  1613. localPort,
  1614. remoteHost,
  1615. remotePort,
  1616. notify,
  1617. sessionID);
  1618. }
  1619.  
  1620. ///Add by MTK @{
  1621. status_t ANetworkSession::createTCPBinaryDataSessionActive(
  1622. unsigned localPort,
  1623. const char *remoteHost,
  1624. unsigned remotePort,
  1625. const sp<AMessage> &notify,
  1626. int32_t *sessionID) {
  1627.  
  1628. ALOGI("%s", __FUNCTION__);
  1629.  
  1630. return createClientOrServer(
  1631. kModeCreateTCPBinaryDataSessionActive,
  1632. NULL /* addr */,
  1633. localPort,
  1634. remoteHost,
  1635. remotePort,
  1636. notify,
  1637. sessionID);
  1638. }
  1639. ///@}
  1640.  
  1641. status_t ANetworkSession::createTCPDatagramSession(
  1642. const struct in_addr &addr, unsigned port,
  1643. const sp<AMessage> &notify, int32_t *sessionID) {
  1644. return createClientOrServer(
  1645. kModeCreateTCPDatagramSessionPassive,
  1646. &addr,
  1647. port,
  1648. NULL /* remoteHost */,
  1649. 0 /* remotePort */,
  1650. notify,
  1651. sessionID);
  1652. }
  1653.  
  1654. status_t ANetworkSession::createTCPDatagramSession(
  1655. unsigned localPort,
  1656. const char *remoteHost,
  1657. unsigned remotePort,
  1658. const sp<AMessage> &notify,
  1659. int32_t *sessionID) {
  1660. return createClientOrServer(
  1661. kModeCreateTCPDatagramSessionActive,
  1662. NULL /* addr */,
  1663. localPort,
  1664. remoteHost,
  1665. remotePort,
  1666. notify,
  1667. sessionID);
  1668. }
  1669.  
  1670. status_t ANetworkSession::destroySession(int32_t sessionID) {
  1671. Mutex::Autolock autoLock(mLock);
  1672.  
  1673. ssize_t index = mSessions.indexOfKey(sessionID);
  1674.  
  1675. if (index < 0) {
  1676. return -ENOENT;
  1677. }
  1678.  
  1679. ///M: Close the socekt immediately @{
  1680. const sp<Session> session = mSessions.valueAt(index);
  1681. session->closeSocket();
  1682. /// @}
  1683. mSessions.removeItemsAt(index);
  1684.  
  1685. interrupt();
  1686.  
  1687. return OK;
  1688. }
  1689.  
  1690. // static
  1691. status_t ANetworkSession::MakeSocketNonBlocking(int s) {
  1692. int flags = fcntl(s, F_GETFL, 0);
  1693. if (flags < 0) {
  1694. flags = 0;
  1695. }
  1696.  
  1697. int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
  1698. if (res < 0) {
  1699. return -errno;
  1700. }
  1701.  
  1702. return OK;
  1703. }
  1704.  
  1705. status_t ANetworkSession::MakeSocketBlocking(int s) {
  1706. int flags = fcntl(s, F_GETFL, 0);
  1707. if (flags < 0) {
  1708. flags = 0;
  1709. }
  1710.  
  1711. int res = fcntl(s, F_SETFL, flags &(~O_NONBLOCK));
  1712. if (res < 0) {
  1713. return -errno;
  1714. }
  1715.  
  1716. return OK;
  1717. }
  1718.  
  1719. status_t ANetworkSession::createClientOrServer(
  1720. Mode mode,
  1721. const struct in_addr *localAddr,
  1722. unsigned port,
  1723. const char *remoteHost,
  1724. unsigned remotePort,
  1725. const sp<AMessage> &notify,
  1726. int32_t *sessionID) {
  1727. Mutex::Autolock autoLock(mLock);
  1728.  
  1729. *sessionID = 0;
  1730. status_t err = OK;
  1731. int s, res;
  1732. sp<Session> session;
  1733.  
  1734. ALOGI("createClientOrServer: mode:%d", mode);
  1735. s = socket(
  1736. AF_INET,
  1737. (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
  1738. 0);
  1739.  
  1740. if (s < 0) {
  1741. err = -errno;
  1742. ALOGE("Error in createClientOrServer:%d", err);
  1743. goto bail;
  1744. }
  1745.  
  1746. if (mode == kModeCreateRTSPServer
  1747. ///Add by MTK @{
  1748. || mode == kModeCreateTCPDatagramSessionPassive
  1749. || mode == kModeCreateTCPTextDataSessionPassive
  1750. || mode == kModeCreateUIBCServer
  1751. || mode == kModeCreateUDPSession) {
  1752. ///@}
  1753. const int yes = 1;
  1754. ALOGD("Set socket resue:%d", yes);
  1755. res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
  1756.  
  1757. if (res < 0) {
  1758. err = -errno;
  1759. goto bail2;
  1760. }
  1761. ///Add by MTK @{
  1762. struct linger so_linger;
  1763. so_linger.l_onoff = true;
  1764. so_linger.l_linger = 0;
  1765. ALOGD("Set socket linger:%d", so_linger.l_onoff);
  1766. res = setsockopt(s, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
  1767.  
  1768. int flag = 1;
  1769. res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
  1770.  
  1771. ///@}
  1772. }
  1773.  
  1774. if (mode == kModeCreateUDPSession) {
  1775. #ifdef MTK_AOSP_ENHANCEMENT
  1776. int size = MAX_BUFFER_SIZE * 1024;
  1777. #else
  1778. int size = 256 * 1024;
  1779. #endif
  1780. int sendbuff;
  1781. socklen_t optlen = sizeof(sendbuff);
  1782. res = getsockopt(s, SOL_SOCKET, SO_SNDBUF, &sendbuff, (socklen_t*)&optlen);
  1783. ALOGI("original socket buffer size: %d", sendbuff);
  1784.  
  1785. res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
  1786.  
  1787. if (res < 0) {
  1788. err = -errno;
  1789. goto bail2;
  1790. }
  1791.  
  1792. res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
  1793.  
  1794. if (res < 0) {
  1795. err = -errno;
  1796. goto bail2;
  1797. }
  1798.  
  1799.  
  1800. res = getsockopt(s, SOL_SOCKET, SO_SNDBUF, &sendbuff, (socklen_t*)&optlen);
  1801. ALOGI("after socket buffer size: %d", sendbuff);
  1802.  
  1803. //Configure QoS priority for UDP/RTP packets
  1804. int opt;
  1805. int priority;
  1806. priority = 5; /* 5: VI 7: VO */
  1807. opt = priority << 5;
  1808.  
  1809. res = setsockopt(s, SOL_IP, IP_TOS, &opt, sizeof(opt));
  1810. if (res < 0) {
  1811. err = -errno;
  1812. ALOGD("Socket IP_TOS option:%d", err);
  1813. }
  1814.  
  1815. opt = priority;
  1816. res = setsockopt(s, SOL_SOCKET, SO_PRIORITY, &opt, sizeof(opt));
  1817. if (res < 0) {
  1818. err = -errno;
  1819. ALOGD("Socket SO_PRIORITY option:%d", err);
  1820. }
  1821. } else if (mode == kModeCreateTCPDatagramSessionActive) {
  1822. int flag = 1;
  1823. res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
  1824.  
  1825. if (res < 0) {
  1826. err = -errno;
  1827. goto bail2;
  1828. }
  1829.  
  1830. int tos = 224; // VOICE
  1831. res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos));
  1832.  
  1833. if (res < 0) {
  1834. err = -errno;
  1835. goto bail2;
  1836. }
  1837. }
  1838.  
  1839. err = MakeSocketNonBlocking(s);
  1840.  
  1841. if (err != OK) {
  1842. goto bail2;
  1843. }
  1844.  
  1845. struct sockaddr_in addr;
  1846. memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
  1847. addr.sin_family = AF_INET;
  1848.  
  1849. if (mode == kModeCreateRTSPClient
  1850. || mode == kModeCreateTCPDatagramSessionActive
  1851. || mode == kModeCreateUIBCClient
  1852. ///Add by MTK @{
  1853. || mode == kModeCreateTCPBinaryDataSessionActive
  1854. ///@}
  1855. )
  1856. {
  1857. struct hostent *ent= gethostbyname(remoteHost);
  1858. if (ent == NULL) {
  1859. err = -h_errno;
  1860. goto bail2;
  1861. }
  1862.  
  1863. addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
  1864. addr.sin_port = htons(remotePort);
  1865. ///Add by MTK @{
  1866. } else if (localAddr != NULL && (mode != kModeCreateRTSPServer &&
  1867. mode != kModeCreateTCPTextDataSessionPassive &&
  1868. mode != kModeCreateUIBCServer)) {
  1869. ///@}
  1870. addr.sin_addr = *localAddr;
  1871. addr.sin_port = htons(port);
  1872. ALOGI("Host info %s:%d\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
  1873. } else {
  1874. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  1875. addr.sin_port = htons(port);
  1876. }
  1877.  
  1878. if (mode == kModeCreateRTSPClient
  1879. || mode == kModeCreateTCPDatagramSessionActive
  1880. || mode == kModeCreateUIBCClient
  1881. ///Add by MTK @{
  1882. || mode == kModeCreateTCPBinaryDataSessionActive
  1883. ///@}
  1884. )
  1885. {
  1886. in_addr_t x = ntohl(addr.sin_addr.s_addr);
  1887. ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
  1888. s,
  1889. (x >> 24),
  1890. (x >> 16) & 0xff,
  1891. (x >> 8) & 0xff,
  1892. x & 0xff,
  1893. ntohs(addr.sin_port));
  1894.  
  1895. res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
  1896. CHECK_LT(res, 0);
  1897. if (errno == EINPROGRESS) {
  1898. res = 0;
  1899. }
  1900. } else {
  1901. res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
  1902. ALOGI("Bind is Done");
  1903.  
  1904. if (res == 0) {
  1905. ///Add by MTK @{
  1906. if (mode == kModeCreateRTSPServer
  1907. || mode == kModeCreateTCPDatagramSessionPassive
  1908. || mode == kModeCreateTCPTextDataSessionPassive
  1909. || mode == kModeCreateUIBCServer) {
  1910. ALOGI("socket listen");
  1911. ///@}
  1912. res = listen(s, 4);
  1913. } else {
  1914. CHECK_EQ(mode, kModeCreateUDPSession);
  1915.  
  1916. if (remoteHost != NULL) {
  1917. struct sockaddr_in remoteAddr;
  1918. memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
  1919. remoteAddr.sin_family = AF_INET;
  1920. remoteAddr.sin_port = htons(remotePort);
  1921.  
  1922. struct hostent *ent= gethostbyname(remoteHost);
  1923. if (ent == NULL) {
  1924. err = -h_errno;
  1925. goto bail2;
  1926. }
  1927.  
  1928. remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
  1929.  
  1930. res = connect(
  1931. s,
  1932. (const struct sockaddr *)&remoteAddr,
  1933. sizeof(remoteAddr));
  1934. }
  1935. }
  1936. }
  1937. }
  1938.  
  1939. if (res < 0) {
  1940. err = -errno;
  1941. goto bail2;
  1942. }
  1943.  
  1944. Session::State state;
  1945. switch (mode) {
  1946. case kModeCreateRTSPClient:
  1947. case kModeCreateUIBCClient:
  1948. state = Session::CONNECTING;
  1949. break;
  1950.  
  1951. case kModeCreateTCPDatagramSessionActive:
  1952. ///Add by MTK @{
  1953. case kModeCreateTCPBinaryDataSessionActive:
  1954. ///@}
  1955. state = Session::CONNECTING;
  1956. break;
  1957.  
  1958. case kModeCreateTCPDatagramSessionPassive:
  1959. state = Session::LISTENING_TCP_DGRAMS;
  1960. break;
  1961.  
  1962. case kModeCreateRTSPServer:
  1963. state = Session::LISTENING_RTSP;
  1964. break;
  1965.  
  1966. ///Add by MTK @{
  1967. case kModeCreateTCPTextDataSessionPassive:
  1968. state = Session::LISTENING_TCP_TEXT;
  1969. break;
  1970.  
  1971. case kModeCreateUIBCServer:
  1972. state = Session::LISTENING_TCP_UIBC;
  1973. break;
  1974. ///@}
  1975. default:
  1976. CHECK_EQ(mode, kModeCreateUDPSession);
  1977. state = Session::DATAGRAM;
  1978. break;
  1979. }
  1980.  
  1981. session = new Session(
  1982. mNextSessionID++,
  1983. state,
  1984. s,
  1985. notify);
  1986.  
  1987. if (mode == kModeCreateTCPDatagramSessionActive) {
  1988. session->setMode(Session::MODE_DATAGRAM);
  1989. } else if (mode == kModeCreateRTSPClient) {
  1990. session->setMode(Session::MODE_RTSP);
  1991. ///Add by MTK @{
  1992. } else if (mode == kModeCreateTCPTextDataSessionPassive) {
  1993. session->setTCPConnectionType(Session::TCP_TEXTDATA);
  1994. } else if (mode == kModeCreateUIBCServer) {
  1995. session->setTCPConnectionType(Session::TCP_UIBC);
  1996. } else if (mode == kModeCreateTCPBinaryDataSessionActive) {
  1997. session->setTCPConnectionType(Session::TCP_BINDATA);
  1998. session->setMode(Session::MODE_DATAGRAM);
  1999. }
  2000. ///@}
  2001.  
  2002. mSessions.add(session->sessionID(), session);
  2003.  
  2004. interrupt();
  2005.  
  2006. *sessionID = session->sessionID();
  2007.  
  2008. goto bail;
  2009.  
  2010. bail2:
  2011. ALOGE("Error in createClientOrServer:%d", err);
  2012. close(s);
  2013. s = -1;
  2014.  
  2015. bail:
  2016. return err;
  2017. }
  2018.  
  2019. status_t ANetworkSession::connectUDPSession(
  2020. int32_t sessionID, const char *remoteHost, unsigned remotePort) {
  2021. Mutex::Autolock autoLock(mLock);
  2022.  
  2023. ssize_t index = mSessions.indexOfKey(sessionID);
  2024.  
  2025. if (index < 0) {
  2026. return -ENOENT;
  2027. }
  2028.  
  2029. const sp<Session> session = mSessions.valueAt(index);
  2030. int s = session->socket();
  2031.  
  2032. struct sockaddr_in remoteAddr;
  2033. memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
  2034. remoteAddr.sin_family = AF_INET;
  2035. remoteAddr.sin_port = htons(remotePort);
  2036.  
  2037. status_t err = OK;
  2038. struct hostent *ent = gethostbyname(remoteHost);
  2039. if (ent == NULL) {
  2040. err = -h_errno;
  2041. } else {
  2042. remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
  2043.  
  2044. int res = connect(
  2045. s,
  2046. (const struct sockaddr *)&remoteAddr,
  2047. sizeof(remoteAddr));
  2048.  
  2049. if (res < 0) {
  2050. err = -errno;
  2051. }
  2052. }
  2053.  
  2054. return err;
  2055. }
  2056.  
  2057. #ifdef MTK_AOSP_ENHANCEMENT
  2058. status_t ANetworkSession::sendWFDRequest(
  2059. int32_t sessionID, List<sp<ABuffer> > &packets, int64_t timeUs)
  2060. {
  2061. Mutex::Autolock autoLock(mLock);
  2062.  
  2063. ssize_t index = mSessions.indexOfKey(sessionID);
  2064.  
  2065. if (index < 0) {
  2066. return -ENOENT;
  2067. }
  2068.  
  2069. const sp<Session> session = mSessions.valueAt(index);
  2070. status_t err = session->sendWFDRequest(packets, timeUs);
  2071.  
  2072.  
  2073. interrupt();
  2074.  
  2075. return err;
  2076. }
  2077. #endif
  2078. status_t ANetworkSession::sendRequest(
  2079. int32_t sessionID, const void *data, ssize_t size,
  2080. bool timeValid, int64_t timeUs
  2081. ) {
  2082. Mutex::Autolock autoLock(mLock);
  2083.  
  2084. ssize_t index = mSessions.indexOfKey(sessionID);
  2085.  
  2086. if (index < 0) {
  2087. return -ENOENT;
  2088. }
  2089.  
  2090. const sp<Session> session = mSessions.valueAt(index);
  2091. status_t err = session->sendRequest(data, size, timeValid, timeUs);
  2092.  
  2093. interrupt();
  2094.  
  2095. return err;
  2096. }
  2097.  
  2098. status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) {
  2099. Mutex::Autolock autoLock(mLock);
  2100.  
  2101. ssize_t index = mSessions.indexOfKey(sessionID);
  2102.  
  2103. if (index < 0) {
  2104. return -ENOENT;
  2105. }
  2106.  
  2107. const sp<Session> session = mSessions.valueAt(index);
  2108. return session->switchToWebSocketMode();
  2109. }
  2110.  
  2111. void ANetworkSession::interrupt() {
  2112. static const char dummy = 0;
  2113.  
  2114. ssize_t n;
  2115. do {
  2116. n = write(mPipeFd[1], &dummy, 1);
  2117. } while (n < 0 && errno == EINTR);
  2118.  
  2119. if (n < 0) {
  2120. ALOGW("Error writing to pipe (%s)", strerror(errno));
  2121. }
  2122. }
  2123.  
  2124. void ANetworkSession::threadLoop() {
  2125. fd_set rs, ws;
  2126. FD_ZERO(&rs);
  2127. FD_ZERO(&ws);
  2128.  
  2129. FD_SET(mPipeFd[0], &rs);
  2130. int maxFd = mPipeFd[0];
  2131.  
  2132. {
  2133. Mutex::Autolock autoLock(mLock);
  2134.  
  2135. for (size_t i = 0; i < mSessions.size(); ++i) {
  2136. const sp<Session> &session = mSessions.valueAt(i);
  2137.  
  2138. int s = session->socket();
  2139.  
  2140. if (s < 0) {
  2141. continue;
  2142. }
  2143.  
  2144. if (session->wantsToRead()) {
  2145. FD_SET(s, &rs);
  2146. if (s > maxFd) {
  2147. maxFd = s;
  2148. }
  2149. }
  2150.  
  2151. if (session->wantsToWrite()) {
  2152. FD_SET(s, &ws);
  2153. if (s > maxFd) {
  2154. maxFd = s;
  2155. }
  2156. }
  2157. }
  2158. }
  2159.  
  2160. int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
  2161.  
  2162. if (res == 0) {
  2163. return;
  2164. }
  2165.  
  2166. if (res < 0) {
  2167. if (errno == EINTR) {
  2168. return;
  2169. }
  2170.  
  2171. ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
  2172. return;
  2173. }
  2174.  
  2175. if (FD_ISSET(mPipeFd[0], &rs)) {
  2176. char c;
  2177. ssize_t n;
  2178. do {
  2179. n = read(mPipeFd[0], &c, 1);
  2180. } while (n < 0 && errno == EINTR);
  2181.  
  2182. if (n < 0) {
  2183. ALOGW("Error reading from pipe (%s)", strerror(errno));
  2184. }
  2185.  
  2186. --res;
  2187. }
  2188.  
  2189. {
  2190. Mutex::Autolock autoLock(mLock);
  2191.  
  2192. List<sp<Session> > sessionsToAdd;
  2193.  
  2194. for (size_t i = mSessions.size(); res > 0 && i-- > 0;) {
  2195. const sp<Session> &session = mSessions.valueAt(i);
  2196.  
  2197. int s = session->socket();
  2198.  
  2199. if (s < 0) {
  2200. continue;
  2201. }
  2202.  
  2203. if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
  2204. --res;
  2205. }
  2206.  
  2207. if (FD_ISSET(s, &rs)) {
  2208. ///Add by MTK @{
  2209. if (session->isRTSPServer() || session->isTCPDatagramServer() || session->isTCPServer()) {
  2210. ///@}
  2211. struct sockaddr_in remoteAddr;
  2212. socklen_t remoteAddrLen = sizeof(remoteAddr);
  2213.  
  2214. int clientSocket = accept(
  2215. s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
  2216.  
  2217. if (clientSocket >= 0) {
  2218. status_t err = MakeSocketNonBlocking(clientSocket);
  2219.  
  2220. if (err != OK) {
  2221. ALOGE("Unable to make client socket non blocking, "
  2222. "failed w/ error %d (%s)",
  2223. err, strerror(-err));
  2224.  
  2225. close(clientSocket);
  2226. clientSocket = -1;
  2227. } else {
  2228. in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
  2229.  
  2230. ALOGI("incoming connection from %d.%d.%d.%d:%d "
  2231. "(socket %d)",
  2232. (addr >> 24),
  2233. (addr >> 16) & 0xff,
  2234. (addr >> 8) & 0xff,
  2235. addr & 0xff,
  2236. ntohs(remoteAddr.sin_port),
  2237. clientSocket);
  2238.  
  2239. sp<Session> clientSession =
  2240. new Session(
  2241. mNextSessionID++,
  2242. Session::CONNECTED,
  2243. clientSocket,
  2244. session->getNotificationMessage());
  2245.  
  2246. clientSession->setMode(
  2247. session->isRTSPServer()
  2248. ? Session::MODE_RTSP
  2249. : Session::MODE_DATAGRAM);
  2250. ///Add by MTK @{
  2251. clientSession->setTCPConnectionType(
  2252. session->getTCPConnectionType());
  2253. ///@}
  2254. sessionsToAdd.push_back(clientSession);
  2255. }
  2256. } else {
  2257. ///Add by MTK @{
  2258. ALOGE("accept returned error %d (%s)",
  2259. errno, strerror(errno));
  2260. ///@}
  2261. }
  2262. } else {
  2263. status_t err = session->readMore();
  2264. if (err != OK) {
  2265. ///Add by MTK @{
  2266. ALOGI("readMore on socket %d failed w/ error %d (%s)",
  2267. s, err, strerror(-err));
  2268. ///@}
  2269. }
  2270. }
  2271. }
  2272.  
  2273. if (FD_ISSET(s, &ws)) {
  2274. status_t err = session->writeMore();
  2275. if (err != OK) {
  2276. ALOGI("writeMore on socket %d failed w/ error %d (%s)",
  2277. s, err, strerror(-err));
  2278. }
  2279. }
  2280. }
  2281.  
  2282. while (!sessionsToAdd.empty()) {
  2283. sp<Session> session = *sessionsToAdd.begin();
  2284. sessionsToAdd.erase(sessionsToAdd.begin());
  2285.  
  2286. mSessions.add(session->sessionID(), session);
  2287.  
  2288. ALOGI("added clientSession %d", session->sessionID());
  2289. }
  2290. }
  2291. }
  2292.  
  2293. /// @{
  2294.  
  2295. status_t ANetworkSession::sendDirectRequest(
  2296. int32_t sessionID, const void *data, ssize_t size) {
  2297.  
  2298. ssize_t index = mSessions.indexOfKey(sessionID);
  2299.  
  2300. if (index < 0) {
  2301. return -ENOENT;
  2302. }
  2303.  
  2304. const sp<Session> session = mSessions.valueAt(index);
  2305.  
  2306. status_t err = session->writeDirectRequest(data, size);
  2307.  
  2308. return err;
  2309. }
  2310.  
  2311.  
  2312. status_t ANetworkSession::createTCPTextDataSession(
  2313. const struct in_addr &addr, unsigned port,
  2314. const sp<AMessage> &notify, int32_t *sessionID) {
  2315. return createClientOrServer(
  2316. kModeCreateTCPTextDataSessionPassive,
  2317. &addr,
  2318. port,
  2319. NULL /* remoteHost */,
  2320. 0 /* remotePort */,
  2321. notify,
  2322. sessionID);
  2323. }
  2324.  
  2325. status_t ANetworkSession::createUIBCClient(
  2326. const char *host, unsigned port, const sp<AMessage> &notify,
  2327. int32_t *sessionID) {
  2328. return createClientOrServer(
  2329. kModeCreateUIBCClient,
  2330. NULL /* addr */,
  2331. 0 /* port */,
  2332. host,
  2333. port,
  2334. notify,
  2335. sessionID);
  2336. }
  2337.  
  2338. status_t ANetworkSession::createUIBCServer(
  2339. const struct in_addr &addr, unsigned port,
  2340. const sp<AMessage> &notify, int32_t *sessionID) {
  2341. return createClientOrServer(
  2342. kModeCreateUIBCServer,
  2343. &addr,
  2344. port,
  2345. NULL /* remoteHost */,
  2346. 0 /* remotePort */,
  2347. notify,
  2348. sessionID);
  2349. }
  2350. ///M: Add for RTP data control @{
  2351. status_t ANetworkSession::mtkRTPRecvPause(int32_t sessionID)
  2352. {
  2353. Mutex::Autolock autoLock(mLock);
  2354.  
  2355. ssize_t index = mSessions.indexOfKey(sessionID);
  2356.  
  2357. if (index < 0) {
  2358. return -ENOENT;
  2359. }
  2360.  
  2361. const sp<Session> session = mSessions.valueAt(index);
  2362. return session->mtkRTPRecvPause();
  2363. }
  2364. status_t ANetworkSession::mtkRTPRecvResume(int32_t sessionID)
  2365. {
  2366.  
  2367. Mutex::Autolock autoLock(mLock);
  2368.  
  2369. ssize_t index = mSessions.indexOfKey(sessionID);
  2370.  
  2371. if (index < 0) {
  2372. return -ENOENT;
  2373. }
  2374. const sp<Session> session = mSessions.valueAt(index);
  2375. return session->mtkRTPRecvResume();
  2376. }
  2377.  
  2378. int64_t ANetworkSession::getRTPRecvNum(int32_t sessionID){
  2379.  
  2380. Mutex::Autolock autoLock(mLock);
  2381.  
  2382. ssize_t index = mSessions.indexOfKey(sessionID);
  2383.  
  2384. if (index < 0) {
  2385. return -ENOENT;
  2386. }
  2387. const sp<Session> session = mSessions.valueAt(index);
  2388. return session->getRTPRecvNum();
  2389.  
  2390. }
  2391. status_t ANetworkSession::resetRTPRecvNum(int32_t sessionID){
  2392.  
  2393. Mutex::Autolock autoLock(mLock);
  2394.  
  2395. ssize_t index = mSessions.indexOfKey(sessionID);
  2396.  
  2397. if (index < 0) {
  2398. return -ENOENT;
  2399. }
  2400. const sp<Session> session = mSessions.valueAt(index);
  2401. return session->resetRTPRecvNum();
  2402.  
  2403. }
  2404.  
  2405.  
  2406. ///@}
  2407. status_t ANetworkSession::setNetworkSessionTestMode()
  2408. {
  2409. ALOGD("Set Sigma Test Mode ...");
  2410.  
  2411. mTestMode = true;
  2412. return OK;
  2413. }
  2414.  
  2415. bool ANetworkSession::Session::isTCPServer() const {
  2416. return (mState == LISTENING_TCP_TEXT || mState == LISTENING_TCP_UIBC);
  2417. }
  2418.  
  2419. void ANetworkSession::Session::setTCPConnectionType(int tcpType) {
  2420. ALOGD("setIsTCPDatagramConnection:%d", tcpType);
  2421. mTcpType = tcpType;
  2422. }
  2423.  
  2424. int ANetworkSession::Session::getTCPConnectionType() const {
  2425. ALOGD("getIsTCPDatagramConnection:%d", mTcpType);
  2426. return mTcpType;
  2427. }
  2428.  
  2429. void ANetworkSession::Session::closeSocket() {
  2430.  
  2431. if(mSocket >= 0){
  2432. close(mSocket);
  2433. mSocket = -1;
  2434. }
  2435.  
  2436. }
  2437. status_t ANetworkSession::Session::mtkRTPRecvPause(){
  2438. ALOGD("RTP Receive Pause");
  2439. mRTPPause = true;
  2440. return OK;
  2441. }
  2442.  
  2443. status_t ANetworkSession::Session::mtkRTPRecvResume(){
  2444. ALOGD("RTP Receive Resume");
  2445. mRTPPause = false;
  2446. return OK;
  2447. }
  2448.  
  2449. int64_t ANetworkSession::Session::getRTPRecvNum(){
  2450.  
  2451. ALOGD("Receive RTP Number is %lld",mRTPCounter);
  2452. return mRTPCounter;
  2453. }
  2454. status_t ANetworkSession::Session::resetRTPRecvNum(){
  2455.  
  2456. ALOGD("Reset receive RTP Number 0 ");
  2457. mRTPCounter = 0;
  2458. return OK;
  2459. }
  2460.  
  2461.  
  2462.  
  2463.  
  2464.  
  2465. /// @}
  2466. } // namespace android
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement