Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import 'dart:convert';
- import 'dart:io';
- import 'dart:core';
- import 'dart:async';
- import 'dart:math';
- import 'dart:typed_data';
- import 'package:cure/crypto.dart';
- import 'package:pedantic/pedantic.dart';
- import 'package:web_socket_channel/web_socket_channel.dart';
- import 'package:aws_url_signer/aws_url_signer.dart';
- // import 'aws-signature-v4.dart';
- // import 'aws-signature-v4.dart';
- const AWS_ACCESS_KEY_ID = "AKIAXMZKJ7Q434VVG2XX";
- const AWS_SECRET_ACCESS_KEY = "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";
- int _read32bitInt(Uint8List bytesList, int start) {
- final bytes = bytesList.sublist(start, start+4);
- int r = (bytes[0] << 24) + (bytes[1] << 16) + (bytes[2] << 8) + bytes[3];
- return r;
- }
- class EventStreamHeader {
- final String name;
- final int type;
- final String value;
- EventStreamHeader(this.name, this.type, this.value);
- }
- class EventStreamResponseException implements Exception {
- final String type;
- final String message;
- EventStreamResponseException({this.type, this.message});
- String toString() => "EventStreamResponseException: $message";
- }
- Uint8List encodeEventStreamHeaders(List<EventStreamHeader> headers) {
- var bytesInt = <int>[];
- for (final header in headers) {
- bytesInt.add(header.name.length);
- bytesInt.addAll(header.name.codeUnits);
- bytesInt.add(header.type);
- bytesInt.add(header.value.length);
- bytesInt.addAll(header.value.codeUnits);
- }
- return Uint8List.fromList(bytesInt);
- }
- List<EventStreamHeader> decodeEventStreamHeaders(Uint8List headersData) {
- var lastEnd = 0;
- var headers = <EventStreamHeader>[];
- while (lastEnd < headersData.length) {
- final nameLen = headersData[0];
- final name = String.fromCharCodes(headersData.sublist(1, 1 + nameLen));
- final type = headersData[1 + nameLen];
- final valueLen = headersData[nameLen + 2] << 8 + headersData[nameLen + 3];
- final valueStart = nameLen + 4;
- lastEnd = valueStart + valueLen;
- final value = String.fromCharCodes(
- headersData.sublist(valueStart, lastEnd));
- headers.add(EventStreamHeader(name, type, value));
- }
- return headers;
- }
- Uint8List createEventStreamFrame(Uint8List audioChunk) {
- final headers = [
- EventStreamHeader(":content-type", 7, "application/octet-stream"),
- EventStreamHeader(":event-type", 7, "AudioEvent"),
- EventStreamHeader(":message-type", 7, "event")
- ];
- final headersData = encodeEventStreamHeaders(headers);
- final int totalLength = 16 + audioChunk.lengthInBytes + headersData.lengthInBytes;
- // final prelude = [headersData.length, totalLength];
- // print("Prelude: " + prelude.toString());
- // Convert a 32b int to 4 bytes
- List<int> int32ToBytes(int i) { return [(0xFF000000 & i) >> 24, (0x00FF0000 & i) >> 16, (0x0000FF00 & i) >> 8, (0x000000FF & i)]; }
- final audioBytes = ByteData.sublistView(audioChunk);
- var offset = 0;
- var audioDataList = <int>[];
- while (offset < audioBytes.lengthInBytes) {
- audioDataList.add(audioBytes.getInt16(offset, Endian.little));
- offset += 2;
- }
- final crc = CRC.crc32();
- final messageBldr = BytesBuilder();
- messageBldr.add(int32ToBytes(totalLength));
- messageBldr.add(int32ToBytes(headersData.length));
- // Now we can calc the CRC. We need to do it on the bytes, not the Ints
- final preludeCrc = crc.calculate(messageBldr.toBytes());
- // Continue adding data
- messageBldr.add(int32ToBytes(preludeCrc));
- messageBldr.add(headersData.toList());
- // messageBldr.add(audioChunk.toList());
- messageBldr.add(audioDataList);
- final messageCrc = crc.calculate(messageBldr.toBytes().toList());
- messageBldr.add(int32ToBytes(messageCrc));
- final frame = messageBldr.toBytes();
- //print("${frame.length} == $totalLength");
- return frame;
- }
- dynamic decodeEventStreamResponse(Uint8List resp) {
- final totalLen = _read32bitInt(resp, 0);
- //print("${totalLen} == ${resp.length}");
- final headersLen = _read32bitInt(resp, 4);
- final preludeCRC = _read32bitInt(resp, 8);
- final messageCrc = _read32bitInt(resp, resp.length-4);
- final preludeBytes = resp.sublist(0, 8);
- final messageBytes = resp.sublist(0, totalLen - 4);
- final payloadStart = 12 + headersLen;
- final headersBytes = resp.sublist(12, payloadStart);
- final payloadLen = totalLen - 16 - headersLen;
- final payloadBytes = resp.sublist(payloadStart, payloadStart + payloadLen);
- // Check the CRCs
- final crc = CRC.crc32();
- if (!crc.verify(messageBytes.toList(growable: false), messageCrc)) {
- throw Exception("Message CRC failed");
- }
- if (!crc.verify(preludeBytes.toList(growable: false), preludeCRC)) {
- throw Exception("Prelude CRC failed");
- }
- final headers = decodeEventStreamHeaders(headersBytes);
- final body = String.fromCharCodes(messageBytes);
- // print(headers);
- // print(body);
- final messageType = headers
- .firstWhere((header) => header.name == ':message-type')
- .value;
- // EXCEPTION RESPONSE
- if (messageType == "exception") {
- final exceptionType = headers
- .firstWhere((header) => header.name == ':exception-type')
- .value;
- // parse body as string
- throw EventStreamResponseException(type: exceptionType, message: body);
- }
- if (messageType != 'event') { throw Exception("Unknown message-type: " + messageType); }
- // Parse body as JSON string
- // DECODE JSON
- final json = JsonDecoder().convert(body);
- // print(json);
- return json;
- }
- void _printBytes(Uint8List bytes) {
- print([
- for (final x in bytes)
- if (x >= 32 && x <= 127 ) String.fromCharCode(x)
- else
- "<${x.toRadixString(16).toUpperCase()}>"
- ].join());
- }
- void main(List<String> arguments) async {
- print("ENDIAN: " + (Endian.host == Endian.little ? 'Little' : 'Big'));
- final audioBytes = File("./bin/test5.dat").readAsBytesSync();
- // final R = Random();
- // final audioBytes = Uint8List.fromList(List<int>.from([for (var x=0; x<16000*5; x++) R.nextInt(0xFF)]));
- // final audioData = Uint16List.fromList([for (var x=32; x<128; x++) x ]).buffer.asUint8List();
- // final audioWavBuff = audioFile.readAsBytesSync();
- // print(audioWavBuff.getRange(37,40));
- //
- // audioStream.forEach((element) {
- // print('SENDING...');
- // final bytes = base64.encode(element);
- // //audioStream.sendData(bytes);
- // });
- final signedUrl = getSignedWebSocketUrl(
- apiId: 'transcribestreaming',
- service: 'transcribe',
- accessKey: AWS_ACCESS_KEY_ID,
- secretKey: AWS_SECRET_ACCESS_KEY,
- region: REGION,
- stage: "stream-transcription-websocket",
- queryParams: {
- 'language-code': 'en-US',
- 'media-encoding': 'pcm',
- 'sample-rate': '16000',
- },
- debug: false
- );
- final streamUrl = Uri.parse(signedUrl);
- final channel = WebSocketChannel.connect(streamUrl);
- // channel.stream.asBroadcastStream(onListen: (subscription) {
- // subscription.onData((data) { print("DATA! " + data.toString());});
- // subscription.onError((){ print("ERROR");});
- // }, onCancel: (subscription) {
- // print("CANCEL");
- // },);
- sendNextChunk(int head) async {
- final readEnd = 16000 * 1 + head;
- print("SENDING: ${head} to ${readEnd}");
- final dataToSend = audioBytes.sublist(head, readEnd);
- // print(dataToSend.reduce((value, element) => value + element));
- final frame = createEventStreamFrame(dataToSend);
- // _printBytes(frame);
- channel.sink.add(frame);
- // wait 1 sec and iterate
- await Future.delayed(Duration(milliseconds: 500));
- sendNextChunk(readEnd);
- }
- // Start sending...
- sendNextChunk(0);
- readNext() async {
- final responses = await channel.stream.toList();
- print("Resp len: " + responses.length.toString());
- if (responses.length == 0) { readNext(); }
- final resp = responses[0];
- //print(resp);
- final buffer = Uint8List.fromList(resp);
- print('RECEVIED!');
- _printBytes(buffer);
- // final json = decodeEventStreamResponse(buffer);
- // print('decoded');
- // print(json);
- readNext();
- }
- readNext();
- //
- //:message-typeappexception{"Message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.\n\nThe Canonical String for this request should have been\n'GET\n/stream-transcription-websocket\nAccept=application%2Fjson&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAXMZKJ7Q434VVG2XX%2F20210614%2Feu-central-1%2Ftranscribe%2Faws4_request&X-Amz-Date=20210614T145716Z&X-Amz-Expires=15&X-Amz-SignedHeaders=accept%3Bhost%3Bx-amz-content-sha256%3Bx-amz-date&language-code=en-US&media-encoding=pcm&sample-rate=44100\naccept:\nhost:transcribestreaming.eu-central-1.amazonaws.com:8443\nx-amz-content-sha256:\nx-amz-date:\n\naccept;host;x-amz-content-sha256;x-amz-date\ne3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'\n\nThe String-to-Sign should have been\n'AWS4-HMAC-SHA256\n20210614T145716Z\n20210614/eu-central-1/transcribe/aws4_request\n8d02df66b5964115b115db75576558e2103c2e3de98864c7a2e1e807f0bbbb05'\n"}óß+y
- // channel.sink.add("aaaaaaaaaaaaaa");
- //
- // // let query = "language-code=" + languageCode + "'media-encoding=pcm&sample-rate': '" + sampleRate',
- // // if (showSpeakerLabel) {
- // // query += ''show-speaker-label': '' + showSpeakerLabel',
- // // }
- // //
- // // return createPresignedURL(
- // // "GET",
- // // endpoint,
- // // "/stream-transcription-websocket",
- // // "transcribe",
- // // crypto.createHash("sha256").update("", "utf8").digest("hex"),
- // // {
- // // key: this.accessKeyId,
- // // secret: this.secretAccessKey,
- // // sessionToken: this.sessionToken,
- // // protocol: "wss",
- // // expires: 15,
- // // region: region,
- // // query: query,
- // // }
- // // )
- //
- //
- //
- // final headers = client.signedHeaders(
- // uri.toString()
- // );
- //
- // headers.addAll({
- // 'method': 'POST',
- // 'path': uri.path,
- // 'scheme': uri.scheme,
- // 'authority': uri.host,
- // 'x-amz-content-sha256': 'STREAMING-AWS4-HMAC-SHA256-EVENTS',
- // "x-amz-target": "com.amazonaws.transcribe.Transcribe.StartStreamTranscription",
- // "x-amzn-transcribe-language-code": "en-US",
- // "x-amz-transcribe-sample-rate": "44100",
- // 'x-amzn-transcribe-media-encoding': 'pcm',
- // "transfer-encoding": "chunked"
- // });
- //
- // // Convert to header array
- // var headersProper = <Header>[];
- // headers.forEach((key, value) {
- // headersProper.add(Header.ascii(":$key", value));
- // });
- //
- //
- // headersProper.forEach((e) {
- // Header h = e;
- // stdout.writeln("${String.fromCharCodes(h.name)}: ${String.fromCharCodes(h.value)}");
- // });
- //
- // final socket = await connect(uri);
- // final transport = new ClientTransportConnection.viaSocket(socket);
- // print(transport.isOpen ? "YESSSS" : "NOOOOO");
- // var stream = transport.makeRequest(headersProper, endStream: true);
- // // stream.sendData([1,2,3]);
- //
- // stream.incomingMessages.forEach((element) {print(element);});
- // try {
- // await for (var message in stream.incomingMessages) {
- // if (message is HeadersStreamMessage) {
- // for (var header in message.headers) {
- // var name = utf8.decode(header.name);
- // var value = utf8.decode(header.value);
- // print('$name: $value');
- // }
- // } else if (message is DataStreamMessage) {
- // // Use [message.bytes] (but respect 'content-encoding' header)
- // }
- // }
- // } on Exception catch(e) {
- // print(e.toString());
- // }
- // await transport.finish();
- // final ClientTransportStream stream = transport.makeRequest(
- // headersProper, endStream: false);
- //
- // stream.incomingMessages.forEach((element) {
- // print("LISTENING...");
- // print(element);
- // });
- // .asBroadcastStream(onListen: (subscription) {
- // print("LISTENING...");
- // subscription.onData((data) {
- // print('RECEIVED...');
- // print(data);
- // });
- // });
- // stream.sendData([1,2,3]);
- // print("STREAMING...");
- // final audioStream = audioFile.transform(latin1.decoder);
- //
- // audioStream.forEach((element) {
- // print('SENDING...');
- // final bytes = latin1.encode(element);
- // stream.sendData(bytes);
- // });
- }
- const REGION = 'eu-central-1';
- //
- // await for (var message in stream.incomingMessages) {
- // if (message is HeadersStreamMessage) {
- // for (var header in message.headers) {
- // var name = utf8.decode(header.name);
- // var value = utf8.decode(header.value);
- // print('Header: $name: $value');
- // }
- // } else if (message is DataStreamMessage) {
- // // Use [message.bytes] (but respect 'content-encoding' header)
- // }
- // }
- // await transport.finish();
- Future<Socket> connect(Uri uri) async {
- var useSSL = uri.scheme == 'https';
- if (useSSL) {
- var secureSocket = await SecureSocket.connect(uri.host, uri.port,
- supportedProtocols: ['h2']);
- if (secureSocket.selectedProtocol != 'h2') {
- throw Exception('Failed to negogiate http/2 via alpn. Maybe server '
- "doesn't support http/2.");
- }
- return secureSocket;
- } else {
- return await Socket.connect(uri.host, uri.port);
- }
- }
Add Comment
Please, Sign In to add comment