Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env escript
- cDS_PACKET_RELAY_DELAY() -> 1. % delay inserted between splitted segments sent by downstream (in ms)
- cUS_PACKET_RELAY_DELAY() -> 1. % delay inserted between splitted segments sent by upstream (in ms)
- cSPLIT_DS_SEGMENT_LEN() -> 1. % downstream segment length to split into (in byte)
- cSPLIT_US_SEGMENT_LEN() -> 1. % upstream segment length to split into (in byte)
- cSPLIT_DS_PACKET() -> true. % whether split packet sent by downstream into segments before relaying to upstream
- cSPLIT_US_PACKET() -> true. % whether split packet sent by upstream into segments before relaying to downstream
- cFORCE_ACCURATE_SPLIT() -> false. % whether force accurate splitting (don't relay if there's not enough data for a segment)
- cCLOSE_INSTEAD_OF_DELAY() -> false. % close upstream/downstream connection instead of delay a certain time when packet is being splitting
- cDEFAULT_DEST_ADDR() -> {127, 0, 0, 1}.
- cLSOCK_OPTS() -> [binary, {packet, 0}, {reuseaddr, true}]. % Listening socket default options
- cCSOCK_OPTS() -> [binary, {packet, 0}, {nodelay, true}]. % Client socket default options (upstream/downstream)
- main([Src, Dest]) ->
- SrcPort = list_to_integer(Src),
- {DestAddr, DestPort} = parse_addr(Dest),
- run_proxy(SrcPort, DestAddr, DestPort);
- main(_) -> usage().
- run_proxy(SP, DA, DP) ->
- io:format("Proxy tcp traffic from port ~b to ~p:~b~n", [SP, DA, DP]),
- {ok, LS} = gen_tcp:listen(SP, [{active, false} | cLSOCK_OPTS()]),
- proxy_server_loop(LS, DA, DP).
- proxy_server_loop(LSock, DA, DP) ->
- {ok, DS} = gen_tcp:accept(LSock),
- {ok, {SAddr, SPort}} = inet:peername(DS),
- {ok, {DAddr, DPort}} = inet:sockname(DS),
- io:format("*** Incoming connection from ~p:~b to ~p:~b~n", [SAddr, SPort, DAddr, DPort]),
- Pid = spawn(
- fun () ->
- receive start -> ok end,
- ok = inet:setopts(DS, [{active, true} | cCSOCK_OPTS()]),
- run_proxy_client(DS, DA, DP),
- gen_tcp:close(DS)
- end
- ),
- ok = gen_tcp:controlling_process(DS, Pid),
- Pid ! start,
- proxy_server_loop(LSock, DA, DP).
- run_proxy_client(DS, DA, DP) ->
- {ok, US} = gen_tcp:connect(DA, DP, [{active, true} | cCSOCK_OPTS()]),
- proxy_client_loop(DS, US),
- gen_tcp:close(US).
- proxy_client_loop(DS, US) ->
- receive
- {tcp, DS, Data} ->
- relay_downstream_packet(DS, US, Data, false),
- proxy_client_loop(DS, US);
- {tcp_error, DS, Reason} ->
- io:format("*** Error occured on downstream socket: ~p~n", [Reason]),
- % flushing all pending downstream packets to upstream socket
- relay_downstream_packet(DS, US, <<>>, true),
- done;
- {tcp_closed, DS} ->
- io:format("*** Downstream socket closed~n"),
- % flushing all pending downstream packets to upstream socket
- relay_downstream_packet(DS, US, <<>>, true),
- done;
- {tcp, US, Data} ->
- relay_upstream_packet(DS, US, Data, false),
- proxy_client_loop(DS, US);
- {tcp_error, US, Reason} ->
- io:format("*** Error occured on upstream socket: ~p~n", [Reason]),
- % flushing all pending upstream packets to downstream socket
- relay_upstream_packet(DS, US, <<>>, true),
- done;
- {tcp_closed, US} ->
- io:format("*** Upstream socket closed~n"),
- % flushing all pending upstream packets to downstream socket
- relay_upstream_packet(DS, US, <<>>, true),
- done;
- Other ->
- io:format("*** Invalid message: ~p~n", [Other]),
- proxy_client_loop(DS, US)
- end.
- usage() ->
- io:format("Usage: etcproxy <src port> [<dest addr>:]<dest port>~n").
- parse_addr(L) ->
- case string:tokens(L, ":") of
- [PortL] ->
- Addr = cDEFAULT_DEST_ADDR(),
- Port = list_to_integer(PortL),
- {Addr, Port};
- [AddrL, PortL] ->
- {ok, Addr} = inet_parse:ipv4_address(AddrL),
- Port = list_to_integer(PortL),
- {Addr, Port};
- _ ->
- erlang:error("invalid destination", L)
- end.
- relay_downstream_packet(DS, US, Data, Flush) ->
- io:format("*** Proxy ~b bytes from downstream to upstream: ~n~p~n", [byte_size(Data), Data]),
- SplitDsPacket = cSPLIT_DS_PACKET(),
- CloseInsteadOfDelay = cCLOSE_INSTEAD_OF_DELAY(),
- SegSize = if
- SplitDsPacket -> cSPLIT_DS_SEGMENT_LEN();
- true -> 0
- end,
- BinL = merge_packet('downstream', Data, SegSize, cFORCE_ACCURATE_SPLIT(), Flush),
- % io:format("~p~n", [BinL]),
- if SplitDsPacket and CloseInsteadOfDelay -> % send the first packet, then close connection
- gen_tcp:send(US, hd(BinL)),
- gen_tcp:close(US),
- gen_tcp:close(DS);
- SegSize =:= 0 -> % no splitting occured, send data immediately without delay
- gen_tcp:send(US, BinL);
- true -> % send all packets to upstream with delay inserted among them
- lists:foreach(
- fun (Bin) ->
- gen_tcp:send(US, Bin),
- % Emulate network packet delay
- sleep(cDS_PACKET_RELAY_DELAY())
- end,
- BinL
- )
- end,
- io:format("*** Transferred~n"),
- ok.
- relay_upstream_packet(DS, US, Data, Flush) ->
- io:format("*** Proxy ~b bytes from upstream to downstream: ~n~p~n", [byte_size(Data), Data]),
- SplitUsPacket = cSPLIT_US_PACKET(),
- CloseInsteadOfDelay = cCLOSE_INSTEAD_OF_DELAY(),
- SegSize = if
- SplitUsPacket -> cSPLIT_US_SEGMENT_LEN();
- true -> 0
- end,
- BinL = merge_packet('upstream', Data, SegSize, cFORCE_ACCURATE_SPLIT(), Flush),
- % io:format("~p~n", [BinL]),
- if SplitUsPacket and CloseInsteadOfDelay -> % send the first packet, then close connection
- gen_tcp:send(DS, hd(BinL)),
- gen_tcp:close(US),
- gen_tcp:close(DS);
- SegSize =:= 0 -> % no splitting occured, send data immediately without delay
- gen_tcp:send(DS, BinL);
- true -> % send all packets to downstream with delay inserted among them
- lists:foreach(
- fun (Bin) ->
- gen_tcp:send(DS, Bin),
- % Emulate network packet delay
- sleep(cUS_PACKET_RELAY_DELAY())
- end,
- BinL
- )
- end,
- io:format("*** Transferred~n"),
- ok.
- % @doc Merge new upstream / downstream packet with previous remaining data, and
- % return those data which can be sent to downstream / upstream. If 'Flush' is
- % true, the new packet along with all remaining data will be returned, in order
- % to flush buffered data.
- % @end
- merge_packet(Key, Data, SegSize, AccuSplit, Flush) ->
- PrevB = case get(Key) of
- undefined -> <<>>;
- Val -> Val
- end,
- if Flush -> % flush previous remaining data along with current packet
- put(Key, <<>>),
- [iolist_to_binary([PrevB, Data])];
- true ->
- if SegSize > 0 -> % need splitting to segments
- {Segs, Remain} = split_to_segments(
- SegSize,
- iolist_to_binary([PrevB, Data])
- ),
- if AccuSplit -> % force accurate splitting
- put(Key, Remain),
- Segs;
- true -> % no accurate splitting, return data even if it's not long enough for a segment
- put(Key, <<>>),
- lists:flatten([Segs, Remain])
- end;
- true -> % no limiting to data length
- put(Key, <<>>),
- [iolist_to_binary([PrevB, Data])]
- end
- end.
- split_to_segments(N, Bin) when N > 0 -> split_to_segments(N, Bin, {[], <<>>}).
- split_to_segments(_, <<>>, {L, Remain}) -> {lists:reverse(L), Remain};
- split_to_segments(N, Bin, {L, _}) when byte_size(Bin) < N -> split_to_segments(N, <<>>, {L, Bin});
- split_to_segments(N, Bin, {L, _}) ->
- <<Seg:N/binary, Remain/binary>> = Bin,
- split_to_segments(N, Remain, {[Seg | L], <<>>}).
- sleep(N) when N >= 0 ->
- receive
- after N -> ok
- end.
Add Comment
Please, Sign In to add comment