Guest User

dart-aws-transcribe-stream

a guest
Jun 18th, 2021
350
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Dart 13.41 KB | None | 0 0
  1. import 'dart:convert';
  2. import 'dart:io';
  3. import 'dart:core';
  4. import 'dart:async';
  5. import 'dart:math';
  6. import 'dart:typed_data';
  7. import 'package:cure/crypto.dart';
  8. import 'package:pedantic/pedantic.dart';
  9. import 'package:web_socket_channel/web_socket_channel.dart';
  10. import 'package:aws_url_signer/aws_url_signer.dart';
  11.  
  12. // import 'aws-signature-v4.dart';
  13.  
  14.  
  15. // import 'aws-signature-v4.dart';
  16.  
  17.  
  18. const AWS_ACCESS_KEY_ID = "AKIAXMZKJ7Q434VVG2XX";
  19. const AWS_SECRET_ACCESS_KEY = "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";
  20.  
  21.  
  22. int _read32bitInt(Uint8List bytesList, int start) {
  23.   final bytes = bytesList.sublist(start, start+4);
  24.   int r = (bytes[0] << 24) + (bytes[1] << 16) + (bytes[2] << 8) + bytes[3];
  25.   return r;
  26. }
  27.  
  28.  
  29.  
  30. class EventStreamHeader {
  31.   final String name;
  32.   final int type;
  33.   final String value;
  34.   EventStreamHeader(this.name, this.type, this.value);
  35. }
  36.  
  37.  
  38. class EventStreamResponseException implements Exception {
  39.   final String type;
  40.   final String message;
  41.  
  42.   EventStreamResponseException({this.type, this.message});
  43.  
  44.   String toString() => "EventStreamResponseException: $message";
  45. }
  46.  
  47.  
  48. Uint8List encodeEventStreamHeaders(List<EventStreamHeader> headers) {
  49.   var bytesInt = <int>[];
  50.  
  51.   for (final header in headers) {
  52.     bytesInt.add(header.name.length);
  53.     bytesInt.addAll(header.name.codeUnits);
  54.     bytesInt.add(header.type);
  55.     bytesInt.add(header.value.length);
  56.     bytesInt.addAll(header.value.codeUnits);
  57.   }
  58.  
  59.   return Uint8List.fromList(bytesInt);
  60. }
  61.  
  62. List<EventStreamHeader> decodeEventStreamHeaders(Uint8List headersData) {
  63.   var lastEnd = 0;
  64.   var headers = <EventStreamHeader>[];
  65.   while (lastEnd < headersData.length) {
  66.     final nameLen = headersData[0];
  67.     final name = String.fromCharCodes(headersData.sublist(1, 1 + nameLen));
  68.     final type = headersData[1 + nameLen];
  69.     final valueLen = headersData[nameLen + 2] << 8 + headersData[nameLen + 3];
  70.     final valueStart = nameLen + 4;
  71.     lastEnd = valueStart + valueLen;
  72.     final value = String.fromCharCodes(
  73.         headersData.sublist(valueStart, lastEnd));
  74.     headers.add(EventStreamHeader(name, type, value));
  75.   }
  76.   return headers;
  77. }
  78.  
  79.  
  80. Uint8List createEventStreamFrame(Uint8List audioChunk) {
  81.   final headers = [
  82.     EventStreamHeader(":content-type", 7, "application/octet-stream"),
  83.     EventStreamHeader(":event-type", 7, "AudioEvent"),
  84.     EventStreamHeader(":message-type", 7, "event")
  85.   ];
  86.   final headersData = encodeEventStreamHeaders(headers);
  87.  
  88.   final int totalLength = 16 + audioChunk.lengthInBytes + headersData.lengthInBytes;
  89.   // final prelude = [headersData.length, totalLength];
  90.   // print("Prelude: " + prelude.toString());
  91.  
  92.   // Convert a 32b int to 4 bytes
  93.   List<int> int32ToBytes(int i) { return [(0xFF000000 & i) >> 24, (0x00FF0000 & i) >> 16, (0x0000FF00 & i) >> 8, (0x000000FF & i)]; }
  94.  
  95.   final audioBytes = ByteData.sublistView(audioChunk);
  96.   var offset = 0;
  97.   var audioDataList = <int>[];
  98.   while (offset < audioBytes.lengthInBytes) {
  99.     audioDataList.add(audioBytes.getInt16(offset, Endian.little));
  100.     offset += 2;
  101.   }
  102.  
  103.   final crc = CRC.crc32();
  104.   final messageBldr = BytesBuilder();
  105.   messageBldr.add(int32ToBytes(totalLength));
  106.   messageBldr.add(int32ToBytes(headersData.length));
  107.  
  108.   // Now we can calc the CRC. We need to do it on the bytes, not the Ints
  109.   final preludeCrc = crc.calculate(messageBldr.toBytes());
  110.  
  111.   // Continue adding data
  112.   messageBldr.add(int32ToBytes(preludeCrc));
  113.   messageBldr.add(headersData.toList());
  114.   // messageBldr.add(audioChunk.toList());
  115.   messageBldr.add(audioDataList);
  116.   final messageCrc = crc.calculate(messageBldr.toBytes().toList());
  117.   messageBldr.add(int32ToBytes(messageCrc));
  118.   final frame = messageBldr.toBytes();
  119.   //print("${frame.length} == $totalLength");
  120.   return frame;
  121. }
  122.  
  123.  
  124. dynamic decodeEventStreamResponse(Uint8List resp)  {
  125.   final totalLen = _read32bitInt(resp, 0);
  126.   //print("${totalLen} == ${resp.length}");
  127.   final headersLen = _read32bitInt(resp, 4);
  128.   final preludeCRC = _read32bitInt(resp, 8);
  129.   final messageCrc = _read32bitInt(resp, resp.length-4);
  130.   final preludeBytes = resp.sublist(0, 8);
  131.   final messageBytes = resp.sublist(0, totalLen - 4);
  132.   final payloadStart = 12 + headersLen;
  133.   final headersBytes = resp.sublist(12, payloadStart);
  134.   final payloadLen = totalLen - 16 - headersLen;
  135.   final payloadBytes = resp.sublist(payloadStart, payloadStart + payloadLen);
  136.  
  137.   // Check the CRCs
  138.   final crc = CRC.crc32();
  139.   if (!crc.verify(messageBytes.toList(growable: false), messageCrc)) {
  140.     throw Exception("Message CRC failed");
  141.   }
  142.   if (!crc.verify(preludeBytes.toList(growable: false), preludeCRC)) {
  143.     throw Exception("Prelude CRC failed");
  144.   }
  145.  
  146.   final headers = decodeEventStreamHeaders(headersBytes);
  147.   final body = String.fromCharCodes(messageBytes);
  148.   // print(headers);
  149.   // print(body);
  150.   final messageType = headers
  151.         .firstWhere((header) => header.name == ':message-type')
  152.         .value;
  153.  
  154.   // EXCEPTION RESPONSE
  155.   if (messageType == "exception") {
  156.     final exceptionType = headers
  157.         .firstWhere((header) => header.name == ':exception-type')
  158.         .value;
  159.     // parse body as string
  160.     throw EventStreamResponseException(type: exceptionType, message: body);
  161.   }
  162.  
  163.   if (messageType != 'event') { throw Exception("Unknown message-type: " + messageType); }
  164.   // Parse body as JSON string
  165.  
  166.   // DECODE JSON
  167.   final json = JsonDecoder().convert(body);
  168.   // print(json);
  169.   return json;
  170. }
  171.  
  172. void _printBytes(Uint8List bytes) {
  173.   print([
  174.     for (final x in bytes)
  175.       if (x >= 32 && x <= 127 ) String.fromCharCode(x)
  176.       else
  177.         "<${x.toRadixString(16).toUpperCase()}>"
  178.   ].join());
  179. }
  180.  
  181.  
  182.  
  183.  
  184. void main(List<String> arguments) async {
  185.   print("ENDIAN: " + (Endian.host == Endian.little ? 'Little' : 'Big'));
  186.  
  187.   final audioBytes = File("./bin/test5.dat").readAsBytesSync();
  188.   // final R = Random();
  189.   // final audioBytes = Uint8List.fromList(List<int>.from([for (var x=0; x<16000*5; x++) R.nextInt(0xFF)]));
  190.   // final audioData = Uint16List.fromList([for (var x=32; x<128; x++) x ]).buffer.asUint8List();
  191.   // final audioWavBuff = audioFile.readAsBytesSync();
  192.   // print(audioWavBuff.getRange(37,40));
  193.  
  194.   //
  195.   // audioStream.forEach((element) {
  196.   //   print('SENDING...');
  197.   //   final bytes = base64.encode(element);
  198.   //   //audioStream.sendData(bytes);
  199.   // });
  200.  
  201.   final signedUrl = getSignedWebSocketUrl(
  202.     apiId: 'transcribestreaming',
  203.     service: 'transcribe',
  204.     accessKey: AWS_ACCESS_KEY_ID,
  205.     secretKey: AWS_SECRET_ACCESS_KEY,
  206.     region: REGION,
  207.     stage: "stream-transcription-websocket",
  208.     queryParams: {
  209.       'language-code': 'en-US',
  210.       'media-encoding': 'pcm',
  211.       'sample-rate': '16000',
  212.     },
  213.     debug: false
  214.   );
  215.  
  216.   final streamUrl = Uri.parse(signedUrl);
  217.   final channel = WebSocketChannel.connect(streamUrl);
  218.  
  219.   // channel.stream.asBroadcastStream(onListen: (subscription) {
  220.   //   subscription.onData((data) { print("DATA! " + data.toString());});
  221.   //   subscription.onError((){ print("ERROR");});
  222.   // }, onCancel: (subscription) {
  223.   //   print("CANCEL");
  224.   // },);
  225.   sendNextChunk(int head) async {
  226.     final readEnd = 16000 * 1 + head;
  227.     print("SENDING: ${head} to ${readEnd}");
  228.  
  229.     final dataToSend = audioBytes.sublist(head, readEnd);
  230.     // print(dataToSend.reduce((value, element) => value + element));
  231.     final frame = createEventStreamFrame(dataToSend);
  232.     // _printBytes(frame);
  233.     channel.sink.add(frame);
  234.  
  235.     // wait 1 sec and iterate
  236.     await Future.delayed(Duration(milliseconds: 500));
  237.     sendNextChunk(readEnd);
  238.   }
  239.   // Start sending...
  240.   sendNextChunk(0);
  241.  
  242.  
  243.   readNext() async {
  244.     final responses = await channel.stream.toList();
  245.     print("Resp len: " + responses.length.toString());
  246.     if (responses.length == 0) { readNext(); }
  247.  
  248.     final resp = responses[0];
  249.     //print(resp);
  250.     final buffer = Uint8List.fromList(resp);
  251.     print('RECEVIED!');
  252.     _printBytes(buffer);
  253.     // final json = decodeEventStreamResponse(buffer);
  254.     // print('decoded');
  255.     // print(json);
  256.     readNext();
  257.   }
  258.  
  259.   readNext();
  260.  
  261.  
  262.   //
  263.  
  264.   //:message-typeappexception{"Message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.\n\nThe Canonical String for this request should have been\n'GET\n/stream-transcription-websocket\nAccept=application%2Fjson&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAXMZKJ7Q434VVG2XX%2F20210614%2Feu-central-1%2Ftranscribe%2Faws4_request&X-Amz-Date=20210614T145716Z&X-Amz-Expires=15&X-Amz-SignedHeaders=accept%3Bhost%3Bx-amz-content-sha256%3Bx-amz-date&language-code=en-US&media-encoding=pcm&sample-rate=44100\naccept:\nhost:transcribestreaming.eu-central-1.amazonaws.com:8443\nx-amz-content-sha256:\nx-amz-date:\n\naccept;host;x-amz-content-sha256;x-amz-date\ne3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'\n\nThe String-to-Sign should have been\n'AWS4-HMAC-SHA256\n20210614T145716Z\n20210614/eu-central-1/transcribe/aws4_request\n8d02df66b5964115b115db75576558e2103c2e3de98864c7a2e1e807f0bbbb05'\n"}óß+y
  265.  
  266.   // channel.sink.add("aaaaaaaaaaaaaa");
  267.  
  268.  
  269.   //
  270.   // // let query = "language-code=" + languageCode + "'media-encoding=pcm&sample-rate': '" + sampleRate',
  271.   // // if (showSpeakerLabel) {
  272.   // //   query += ''show-speaker-label': '' + showSpeakerLabel',
  273.   // // }
  274.   // //
  275.   // // return createPresignedURL(
  276.   // //     "GET",
  277.   // //     endpoint,
  278.   // //     "/stream-transcription-websocket",
  279.   // //     "transcribe",
  280.   // //     crypto.createHash("sha256").update("", "utf8").digest("hex"),
  281.   // //     {
  282.   // //         key: this.accessKeyId,
  283.   // //         secret: this.secretAccessKey,
  284.   // //         sessionToken: this.sessionToken,
  285.   // //         protocol: "wss",
  286.   // //         expires: 15,
  287.   // //         region: region,
  288.   // //         query: query,
  289.   // //     }
  290.   // // )
  291.   //
  292.   //
  293.   //
  294.   // final headers = client.signedHeaders(
  295.   //   uri.toString()
  296.   // );
  297.   //
  298.   // headers.addAll({
  299.   //   'method': 'POST',
  300.   //   'path': uri.path,
  301.   //   'scheme': uri.scheme,
  302.   //   'authority': uri.host,
  303.   //   'x-amz-content-sha256': 'STREAMING-AWS4-HMAC-SHA256-EVENTS',
  304.   //   "x-amz-target": "com.amazonaws.transcribe.Transcribe.StartStreamTranscription",
  305.   //   "x-amzn-transcribe-language-code": "en-US",
  306.   //   "x-amz-transcribe-sample-rate": "44100",
  307.   //   'x-amzn-transcribe-media-encoding': 'pcm',
  308.   //   "transfer-encoding": "chunked"
  309.   // });
  310.   //
  311.   // // Convert to header array
  312.   // var headersProper = <Header>[];
  313.   // headers.forEach((key, value) {
  314.   //   headersProper.add(Header.ascii(":$key", value));
  315.   // });
  316.   //
  317.   //
  318.   // headersProper.forEach((e) {
  319.   //   Header h = e;
  320.   //   stdout.writeln("${String.fromCharCodes(h.name)}: ${String.fromCharCodes(h.value)}");
  321.   // });
  322.   //
  323.   // final socket = await connect(uri);
  324.   // final transport = new ClientTransportConnection.viaSocket(socket);
  325.   // print(transport.isOpen ? "YESSSS" : "NOOOOO");
  326.   // var stream = transport.makeRequest(headersProper, endStream: true);
  327.   // // stream.sendData([1,2,3]);
  328.   //
  329.   // stream.incomingMessages.forEach((element) {print(element);});
  330.  
  331.  
  332.   // try {
  333.   //   await for (var message in stream.incomingMessages) {
  334.   //     if (message is HeadersStreamMessage) {
  335.   //       for (var header in message.headers) {
  336.   //         var name = utf8.decode(header.name);
  337.   //         var value = utf8.decode(header.value);
  338.   //         print('$name: $value');
  339.   //       }
  340.   //     } else if (message is DataStreamMessage) {
  341.   //       // Use [message.bytes] (but respect 'content-encoding' header)
  342.   //     }
  343.   //   }
  344.   // } on Exception catch(e) {
  345.   //   print(e.toString());
  346.   // }
  347.  
  348.   // await transport.finish();
  349.   // final ClientTransportStream stream = transport.makeRequest(
  350.   //     headersProper, endStream: false);
  351.   //
  352.   // stream.incomingMessages.forEach((element) {
  353.   //   print("LISTENING...");
  354.   //   print(element);
  355.   // });
  356.  
  357.   //     .asBroadcastStream(onListen: (subscription) {
  358.   //   print("LISTENING...");
  359.   //   subscription.onData((data) {
  360.   //     print('RECEIVED...');
  361.   //     print(data);
  362.   //   });
  363.   // });
  364.  
  365.   // stream.sendData([1,2,3]);
  366.  
  367.   // print("STREAMING...");
  368.   // final audioStream = audioFile.transform(latin1.decoder);
  369.   //
  370.   // audioStream.forEach((element) {
  371.   //   print('SENDING...');
  372.   //   final bytes = latin1.encode(element);
  373.   //   stream.sendData(bytes);
  374.   // });
  375. }
  376.  
  377. const REGION = 'eu-central-1';
  378.  
  379. //
  380. // await for (var message in stream.incomingMessages) {
  381. //   if (message is HeadersStreamMessage) {
  382. //     for (var header in message.headers) {
  383. //       var name = utf8.decode(header.name);
  384. //       var value = utf8.decode(header.value);
  385. //       print('Header: $name: $value');
  386. //     }
  387. //   } else if (message is DataStreamMessage) {
  388. //     // Use [message.bytes] (but respect 'content-encoding' header)
  389. //   }
  390. // }
  391. // await transport.finish();
  392.  
  393.  
  394. Future<Socket> connect(Uri uri) async {
  395.   var useSSL = uri.scheme == 'https';
  396.   if (useSSL) {
  397.     var secureSocket = await SecureSocket.connect(uri.host, uri.port,
  398.         supportedProtocols: ['h2']);
  399.     if (secureSocket.selectedProtocol != 'h2') {
  400.       throw Exception('Failed to negogiate http/2 via alpn. Maybe server '
  401.           "doesn't support http/2.");
  402.     }
  403.     return secureSocket;
  404.   } else {
  405.     return await Socket.connect(uri.host, uri.port);
  406.   }
  407. }
  408.  
Add Comment
Please, Sign In to add comment