Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Jun 1st, 2012  |  syntax: None  |  size: 32.74 KB  |  hits: 10  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. package POE::Component::Client::Keepalive;
  2. BEGIN {
  3.   $POE::Component::Client::Keepalive::VERSION = '0.268';
  4. }
  5.  
  6. use warnings;
  7. use strict;
  8.  
  9. use Carp qw(croak);
  10. use Errno qw(ETIMEDOUT EBADF);
  11. use Socket qw(SOL_SOCKET SO_LINGER);
  12.  
  13. use POE;
  14. use POE::Wheel::SocketFactory;
  15. use POE::Component::Connection::Keepalive;
  16. use POE::Component::Resolver;
  17. use Net::IP qw(ip_is_ipv4);
  18.  
  19. my $ssl_available;
  20. eval {
  21.   require POE::Component::SSLify;
  22.   $ssl_available = 1;
  23. };
  24.  
  25. use constant DEBUG => 0;
  26. use constant DEBUG_DNS => DEBUG || 0;
  27. use constant DEBUG_DEALLOCATE => DEBUG || 0;
  28.  
  29. # Manage connection request IDs.
  30.  
  31. my $CurrentID = 0;
  32.  
  33. my $default_resolver;
  34. my $instances = 0;
  35.  
  36. my @LookupKeys;
  37. BEGIN {
  38.   @LookupKeys = qw(
  39.     ATTR_CONNKEY_FREE
  40.     ATTR_CONNKEY_BUSY
  41.     KEY_SOCKET
  42.     KEY_CONNKEY
  43.     ATTR_CONNKEY
  44.     ATTR_SOCKET_USED
  45.     ATTR_SOCKET_FREE
  46.     KEY_WID
  47.     KEY_REQID
  48.     KEY_WHEELOBJ
  49.     KEY_REQOBJ
  50.   );
  51.   foreach (@LookupKeys) {
  52.     no strict 'refs';
  53.     *{$_} = sub () { $_ };
  54.   }
  55. }
  56.  
  57. use Hash::Registry::XS;
  58.  
  59. sub initialize_lookup {
  60.   my $self = shift;
  61.   my $lfree = Hash::Registry::XS->new();
  62.   $lfree->register_kt(@_) foreach @LookupKeys;
  63.   $self->[SF_LOOKUP] = $lookup;
  64.   my $lused = Hash::Registry::XS->new();
  65.   $lused->register_kt(@_) foreach @LookupKeys;
  66. }
  67.  
  68.  
  69.  
  70. # The connection manager uses a number of data structures, most of
  71. # them arrays.  These constants define offsets into those arrays, and
  72. # the comments document them.
  73.  
  74.  
  75. use constant {
  76.   SKI_SOCKET => 0,
  77.   SKI_KEY    => 1,
  78.   SKI_ATIME  => 2,
  79.   SKI_TIMER  => 3
  80. };
  81.  
  82.  
  83.                                  # @$self = (
  84. #use constant SF_POOL      => 0;  #   \%socket_pool, UNUSED!
  85. use constant SF_QUEUE     => 1;  #   \@request_queue,
  86. #use constant SF_USED      => 2;  #   \%sockets_in_use, UNUSED!
  87. use constant SF_WHEELS    => 3;  #   \%wheels_by_id,
  88. use constant SF_USED_EACH => 4;  #   \%count_by_triple,
  89. use constant SF_MAX_OPEN  => 5;  #   $max_open_count,
  90. use constant SF_MAX_HOST  => 6;  #   $max_per_host,
  91. use constant SF_SOCKETS   => 7;  #   \%socket_xref,
  92. use constant SF_KEEPALIVE => 8;  #   $keep_alive_secs,
  93. use constant SF_TIMEOUT   => 9;  #   $default_request_timeout,
  94. use constant SF_RESOLVER  => 10; #   $poco_client_dns_object,
  95. use constant SF_SHUTDOWN  => 11; #   $shutdown_flag,
  96. use constant SF_REQ_INDEX => 12; #   \%request_id_to_wheel_id,
  97. use constant SF_BIND_ADDR => 13; #   $bind_address,
  98.                                  # );
  99.  
  100. # @request_queue = (
  101. #   $request,
  102. #   $request,
  103. #   ....
  104. # );
  105.  
  106.                                     # $request = [
  107. use constant RQ_SESSION     => 0;   #   $request_session,
  108. use constant RQ_EVENT       => 1;   #   $request_event,
  109. use constant RQ_SCHEME      => 2;   #   $request_scheme,
  110. use constant RQ_ADDRESS     => 3;   #   $request_address,
  111. use constant RQ_IP          => 4;   #   $request_ip,
  112. use constant RQ_PORT        => 5;   #   $request_port,
  113. use constant RQ_CONN_KEY    => 6;   #   $request_connection_key,
  114. use constant RQ_CONTEXT     => 7;   #   $request_context,
  115. use constant RQ_TIMEOUT     => 8;   #   $request_timeout,
  116. use constant RQ_START       => 9;   #   $request_start_time,
  117. use constant RQ_TIMER_ID    => 10;  #   $request_timer_id,
  118. use constant RQ_WHEEL_ID    => 11;  #   $request_wheel_id,
  119. use constant RQ_ACTIVE      => 12;  #   $request_is_active,
  120. use constant RQ_ID          => 13;  #   $request_id,
  121. use constant RQ_ADDR_FAM    => 14;  #   $request_address_family,
  122. use constant RQ_FOR_SCHEME  => 15;  #   $request_address_family,
  123. use constant RQ_FOR_ADDRESS => 16;  #   $request_address_family,
  124. use constant RQ_FOR_PORT    => 17;  #   $request_address_family,
  125.                                     # ];
  126.  
  127. # Create a connection manager.
  128.  
  129. sub new {
  130.   my $class = shift;
  131.   croak "new() needs an even number of parameters" if @_ % 2;
  132.   my %args = @_;
  133.  
  134.   my $max_per_host = delete($args{max_per_host}) || 4;
  135.   my $max_open     = delete($args{max_open})     || 128;
  136.   my $keep_alive   = delete($args{keep_alive})   || 15;
  137.   my $timeout      = delete($args{timeout})      || 120;
  138.   my $resolver     = delete($args{resolver});
  139.   my $bind_address = delete($args{bind_address});
  140.  
  141.   my @unknown = sort keys %args;
  142.   if (@unknown) {
  143.     croak "new() doesn't accept: @unknown";
  144.   }
  145.  
  146.   my $self = bless [
  147.     { },                # SF_POOL
  148.     [ ],                # SF_QUEUE
  149.     { },                # SF_USED
  150.     { },                # SF_WHEELS
  151.     { },                # SF_USED_EACH
  152.     $max_open,          # SF_MAX_OPEN
  153.     $max_per_host,      # SF_MAX_HOST
  154.     { },                # SF_SOCKETS
  155.     $keep_alive,        # SF_KEEPALIVE
  156.     $timeout,           # SF_TIMEOUT
  157.     undef,              # SF_RESOLVER
  158.     undef,              # SF_SHUTDOWN
  159.     undef,              # SF_REQ_INDEX
  160.     $bind_address,      # SF_BIND_ADDR
  161.   ], $class;
  162.  
  163.   $default_resolver = $resolver
  164.     if $resolver && eval { $resolver->isa('POE::Component::Resolver') };
  165.  
  166.   $self->[SF_RESOLVER] = (
  167.     $default_resolver ||= POE::Component::Resolver->new()
  168.   );
  169.  
  170.   POE::Session->create(
  171.     object_states => [
  172.       $self => {
  173.         _start               => "_ka_initialize",
  174.         _stop                => "_ka_stopped",
  175.         ka_add_to_queue      => "_ka_add_to_queue",
  176.         ka_cancel_dns_response => "_ka_cancel_dns_response",
  177.         ka_conn_failure      => "_ka_conn_failure",
  178.         ka_conn_success      => "_ka_conn_success",
  179.         ka_deallocate        => "_ka_deallocate",
  180.         ka_dns_response      => "_ka_dns_response",
  181.         ka_keepalive_timeout => "_ka_keepalive_timeout",
  182.         ka_reclaim_socket    => "_ka_reclaim_socket",
  183.         ka_relinquish_socket => "_ka_relinquish_socket",
  184.         ka_request_timeout   => "_ka_request_timeout",
  185.         ka_resolve_request   => "_ka_resolve_request",
  186.         ka_set_timeout       => "_ka_set_timeout",
  187.         ka_shutdown          => "_ka_shutdown",
  188.         ka_socket_activity   => "_ka_socket_activity",
  189.         ka_wake_up           => "_ka_wake_up",
  190.       },
  191.     ],
  192.   );
  193.  
  194.   return $self;
  195. }
  196.  
  197. # Initialize the hidden session behind this component.
  198. # Set an alias so the public methods can send it messages easily.
  199.  
  200. sub _ka_initialize {
  201.   my ($object, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
  202.   $instances++;
  203.   $heap->{resolve} = { };
  204.   $kernel->alias_set("$object");
  205. }
  206.  
  207. # When programs crash, the session may stop in a non-shutdown state.
  208. # _ka_stopped and DESTROY catch this either way the death occurs.
  209.  
  210. sub _ka_stopped {
  211.   $_[OBJECT][SF_SHUTDOWN] = 1;
  212. }
  213.  
  214. sub DESTROY {
  215.   my $self = shift;
  216.   $self->shutdown();
  217. }
  218.  
  219. # Request to wake up.  This should only happen during the edge
  220. # condition where the component's request queue goes from empty to
  221. # having one item.
  222. #
  223. # It also happens during free(), to see if there are more sockets to
  224. # deal with.
  225. #
  226. # TODO - Make the _ka_wake_up stuff smart enough not to post duplicate
  227. # messages to the queue.
  228.  
  229. sub _ka_wake_up {
  230.   my ($self, $kernel) = @_[OBJECT, KERNEL];
  231.  
  232.   # Scan the list of requests, until we find one that can be met.
  233.   # Fire off POE::Wheel::SocketFactory to begin the connection
  234.   # process.
  235.  
  236.   my $request_index  = 0;
  237.   my $currently_open = keys(%{$self->[SF_USED]}) + keys(%{$self->[SF_SOCKETS]});
  238.   my @splice_list;
  239.  
  240.   QUEUED:
  241.   foreach my $request (@{$self->[SF_QUEUE]}) {
  242.     DEBUG and warn "WAKEUP: checking for $request->[RQ_CONN_KEY]";
  243.  
  244.     # Sweep away inactive requests.
  245.  
  246.     unless ($request->[RQ_ACTIVE]) {
  247.       push @splice_list, $request_index;
  248.       next;
  249.     }
  250.  
  251.     # Skip this request if its scheme/address/port triple is maxed
  252.     # out.
  253.  
  254.     my $req_key = $request->[RQ_CONN_KEY];
  255.     next if (
  256.       ($self->[SF_USED_EACH]{$req_key} || 0) >= $self->[SF_MAX_HOST]
  257.     );
  258.  
  259.     # Honor the request from the free pool, if possible.  The
  260.     # currently open socket count does not increase.
  261.  
  262.     my $existing_connection = $self->_check_free_pool($req_key);
  263.     if ($existing_connection) {
  264.       push @splice_list, $request_index;
  265.  
  266.       _respond(
  267.         $request, {
  268.           connection => $existing_connection,
  269.           from_cache => "deferred",
  270.         }
  271.       );
  272.      
  273.       # Remove the wheel-to-request index.
  274.      
  275.       #NB we don't need to really do this, assuming that the request itself
  276.       #only exists in the queue, this entry should be garbage collected.
  277.       $self->[SF_REQ_INDEX]->delete_value($request);
  278.      
  279.       _free_req_id($request->[RQ_ID]);
  280.  
  281.       next;
  282.     }
  283.  
  284.     # we can't easily take this out of the outer loop since _check_free_pool
  285.     # can change it from under us
  286.     my @free_sockets = $self->[SF_SOCKETS]->fetch_a(1, ATTR_SOCKET_FREE);
  287.    
  288.  
  289.     # Try to free over-committed (but unused) sockets until we're back
  290.     # under SF_MAX_OPEN sockets.  Bail out if we can't free enough.
  291.     # TODO - Consider removing @free_sockets in least- to
  292.     # most-recently used order.
  293.     while ($currently_open >= $self->[SF_MAX_OPEN]) {
  294.       last QUEUED unless @free_sockets;
  295.       my $next_to_go = $free_sockets[rand(@free_sockets)];
  296.       $self->_remove_socket_from_pool($next_to_go);
  297.       $currently_open--;
  298.     }
  299.  
  300.     # Start the request.  Create a wheel to begin the connection.
  301.     # Move the wheel and its request into SF_WHEELS.
  302.     DEBUG and warn "WAKEUP: creating wheel for $req_key";
  303.  
  304.     my $addr = ($request->[RQ_IP] or $request->[RQ_ADDRESS]);
  305.     my $wheel = POE::Wheel::SocketFactory->new(
  306.       (
  307.         defined($self->[SF_BIND_ADDR])
  308.         ? (BindAddress => $self->[SF_BIND_ADDR])
  309.         : ()
  310.       ),
  311.       RemoteAddress => $addr,
  312.       RemotePort    => $request->[RQ_PORT],
  313.       SuccessEvent  => "ka_conn_success",
  314.       FailureEvent  => "ka_conn_failure",
  315.       SocketDomain  => $request->[RQ_ADDR_FAM],
  316.     );
  317.     #Make the connecting wheel dependent on the request ID..
  318.    
  319.     #XXX: We need to 'strongify' the reqid somehow..
  320.    
  321.     $self->[SF_WHEELS]->store_kt($request, KEY_REQOBJ, StrongValue => 1);
  322.     $self->[SF_WHEELS]->store_kt($wheel->ID, KEY_WID, $wheel);
  323.     $self->[SF_REQ_INDEX]->store_kt($wheel, KEY_WHEELOBJ, $request);
  324.    
  325.     # store the wheel's ID in the request object
  326.     $request->[RQ_WHEEL_ID] = $wheel->ID;
  327.  
  328.     # Count it as used, so we don't over commit file handles.
  329.     $currently_open++;
  330.     $self->[SF_USED_EACH]{$req_key}++;
  331.  
  332.     # Mark the request index as one to splice out.
  333.  
  334.     push @splice_list, $request_index;
  335.   }
  336.   continue {
  337.     $request_index++;
  338.   }
  339.  
  340.   # The @splice_list is a list of element indices that need to be
  341.   # spliced out of the request queue.  We scan in backwards, from
  342.   # highest index to lowest, so that each splice does not affect the
  343.   # indices of the other.
  344.   #
  345.   # This removes the request from the queue.  It's vastly important
  346.   # that the request be entered into SF_WHEELS before now.
  347.  
  348.   my $splice_index = @splice_list;
  349.   while ($splice_index--) {
  350.     splice @{$self->[SF_QUEUE]}, $splice_list[$splice_index], 1;
  351.   }
  352. }
  353.  
  354. sub allocate {
  355.   my $self = shift;
  356.   croak "allocate() needs an even number of parameters" if @_ % 2;
  357.   my %args = @_;
  358.  
  359.   # TODO - Validate arguments.
  360.  
  361.   my $scheme  = delete $args{scheme};
  362.   croak "allocate() needs a 'scheme'"  unless $scheme;
  363.   my $address = delete $args{addr};
  364.   croak "allocate() needs an 'addr'"   unless $address;
  365.   my $port    = delete $args{port};
  366.   croak "allocate() needs a 'port'"    unless $port;
  367.   my $event   = delete $args{event};
  368.   croak "allocate() needs an 'event'"  unless $event;
  369.   my $context = delete $args{context};
  370.   croak "allocate() needs a 'context'" unless $context;
  371.   my $timeout = delete $args{timeout};
  372.   $timeout    = $self->[SF_TIMEOUT]    unless $timeout;
  373.  
  374.   my $for_scheme  = delete($args{for_scheme}) || $scheme;
  375.   my $for_address = delete($args{for_addr}) || $address;
  376.   my $for_port    = delete($args{for_port}) || $port;
  377.  
  378.   croak "allocate() on shut-down connection manager" if $self->[SF_SHUTDOWN];
  379.  
  380.   my @unknown = sort keys %args;
  381.   if (@unknown) {
  382.     croak "allocate() doesn't accept: @unknown";
  383.   }
  384.  
  385.   my $conn_key = (
  386.     "$scheme $address $port for $for_scheme $for_address $for_port"
  387.   );
  388.  
  389.   # If we have a connection pool for the scheme/address/port triple,
  390.   # then we can maybe post an available connection right away.
  391.  
  392.   my $existing_connection = $self->_check_free_pool($conn_key);
  393.   if (defined $existing_connection) {
  394.     $poe_kernel->post(
  395.       $poe_kernel->get_active_session,
  396.       $event => {
  397.         addr       => $address,
  398.         context    => $context,
  399.         port       => $port,
  400.         scheme     => $scheme,
  401.         connection => $existing_connection,
  402.         from_cache => "immediate",
  403.       }
  404.     );
  405.     return;
  406.   }
  407.  
  408.   # We can't honor the request immediately, so it's put into a queue.
  409.   DEBUG and warn "ALLOCATE: enqueuing request for $conn_key";
  410.   my $rqid = ++$CurrentID;
  411.   my $request = [
  412.     $poe_kernel->get_active_session(),  # RQ_SESSION
  413.     $event,       # RQ_EVENT
  414.     $scheme,      # RQ_SCHEME
  415.     $address,     # RQ_ADDRESS
  416.     undef,        # RQ_IP
  417.     $port,        # RQ_PORT
  418.     $conn_key,    # RQ_CONN_KEY
  419.     $context,     # RQ_CONTEXT
  420.     $timeout,     # RQ_TIMEOUT
  421.     time(),       # RQ_START
  422.     undef,        # RQ_TIMER_ID
  423.     undef,        # RQ_WHEEL_ID
  424.     1,            # RQ_ACTIVE
  425.     $rqid, # RQ_ID
  426.     undef,        # RQ_ADDR_FAM
  427.     $for_scheme,  # RQ_FOR_SCHEME
  428.     $for_address, # RQ_FOR_ADDRESS
  429.     $for_port,    # RQ_FOR_PORT
  430.   ];
  431.  
  432.   $self->[SF_REQ_INDEX]->store_kt($rqid, KEY_REQID, $request, StrongValue => 1);
  433.  
  434.   $poe_kernel->refcount_increment(
  435.     $request->[RQ_SESSION]->ID(),
  436.     "poco-client-keepalive"
  437.   );
  438.  
  439.   $poe_kernel->call("$self", ka_set_timeout     => $request);
  440.   $poe_kernel->call("$self", ka_resolve_request => $request);
  441.  
  442.   return $request->[RQ_ID];
  443. }
  444.  
  445. sub deallocate {
  446.   my ($self, $req_id) = @_;
  447.  
  448.   my $request = $self->[SF_REQ_INDEX]->delete_value_by_key_kt($req_id, KEY_REQID);
  449.  
  450.   croak "deallocate() requires a request ID" unless defined $request;
  451.  
  452.   # Now pass the vetted request & its ID into our manager session.
  453.   $poe_kernel->call("$self", "ka_deallocate", $request, $req_id);
  454. }
  455.  
  456. sub _ka_deallocate {
  457.   my ($self, $heap, $request, $req_id) = @_[OBJECT, HEAP, ARG0, ARG1];
  458.  
  459.   my $conn_key = $request->[RQ_CONN_KEY];
  460.   my $existing_connection = $self->_check_free_pool($conn_key);
  461.  
  462.   # Existing connection.  Remove it from the pool, and delete the socket.
  463.   if (defined $existing_connection) {
  464.     $self->_remove_socket_from_pool($existing_connection->{socket});
  465.     DEBUG_DEALLOCATE and warn(
  466.       "deallocate called, deleted already-connected socket"
  467.     );
  468.     return;
  469.   }
  470.  
  471.   # No connection yet.  Cancel the request.
  472.   DEBUG_DEALLOCATE and warn(
  473.     "deallocate called without an existing connection.  ",
  474.     "cancelling connection request"
  475.   );
  476.  
  477.   unless (exists $heap->{resolve}->{$request->[RQ_ADDRESS]}) {
  478.     DEBUG_DEALLOCATE and warn(
  479.       "deallocate cannot cancel dns -- no pending request"
  480.     );
  481.     return;
  482.   }
  483.  
  484.   if ($heap->{resolve}->{$request->[RQ_ADDRESS]} eq 'cancelled') {
  485.     DEBUG_DEALLOCATE and warn(
  486.       "deallocate cannot cancel dns -- request already cancelled"
  487.     );
  488.     return;
  489.   }
  490.  
  491.   $poe_kernel->call( "$self", ka_cancel_dns_response => $request );
  492.   return;
  493. }
  494.  
  495. sub _ka_cancel_dns_response {
  496.   my ($self, $kernel, $heap, $request) = @_[OBJECT, KERNEL, HEAP, ARG0];
  497.  
  498.   my $address = $request->[RQ_ADDRESS];
  499.   DEBUG_DNS and warn "DNS: canceling request for $address\n";
  500.   my $requests = $heap->{resolve}{$address};
  501.  
  502.   # Remove the resolver request for the address of this connection
  503.   # request
  504.  
  505.   my $req_index = @$requests;
  506.   while ($req_index--) {
  507.     next unless $requests->[$req_index] == $request;
  508.     splice(@$requests, $req_index, 1);
  509.     last;
  510.   }
  511.  
  512.   # Clean up the structure for the address if there are no more
  513.   # requests to resolve that address.
  514.  
  515.   unless (@$requests) {
  516.     DEBUG_DNS and warn "DNS: canceled all requests for $address";
  517.     $heap->{resolve}{$address} = 'cancelled';
  518.   }
  519.  
  520.   # cancel our attempt to connect
  521.   $poe_kernel->alarm_remove( $request->[RQ_TIMER_ID] );
  522.   $poe_kernel->refcount_decrement(
  523.     $request->[RQ_SESSION]->ID(), "poco-client-keepalive"
  524.   );
  525. }
  526.  
  527. # Set the request's timeout, in the component's context.
  528.  
  529. sub _ka_set_timeout {
  530.   my ($kernel, $request) = @_[KERNEL, ARG0];
  531.   $request->[RQ_TIMER_ID] = $kernel->delay_set(
  532.     ka_request_timeout => $request->[RQ_TIMEOUT], $request
  533.   );
  534. }
  535.  
  536. # The request has timed out.  Mark it as defunct, and respond with an
  537. # ETIMEDOUT error.
  538.  
  539. sub _ka_request_timeout {
  540.   my ($self, $kernel, $request) = @_[OBJECT, KERNEL, ARG0];
  541.  
  542.   DEBUG and warn(
  543.     "CON: request from session ", $request->[RQ_SESSION]->ID,
  544.     " for address ", $request->[RQ_ADDRESS], " timed out"
  545.   );
  546.   $! = ETIMEDOUT;
  547.  
  548.   # The easiest way to do this?  Simulate an error from the wheel
  549.   # itself.
  550.  
  551.   if (defined $request->[RQ_WHEEL_ID]) {
  552.     @_[ARG0..ARG3] = ("connect", $!+0, "$!", $request->[RQ_WHEEL_ID]);
  553.     goto &_ka_conn_failure;
  554.   }
  555.  
  556.   # But what if there is no wheel?
  557.   _respond_with_error($request, "connect", $!+0, "$!"),
  558. }
  559.  
  560. # Connection failed.  Remove the SF_WHEELS record corresponding to the
  561. # request.  Remove the SF_USED placeholder record so it won't count
  562. # anymore.  Send a failure notice to the requester.
  563.  
  564. sub _ka_conn_failure {
  565.   my ($self, $func, $errnum, $errstr, $wheel_id) = @_[OBJECT, ARG0..ARG3];
  566.  
  567.   DEBUG and warn "CON: sending $errstr for function $func";
  568.   # Remove the SF_WHEELS record.
  569.  
  570.   my $wheel = $self->[SF_WHEELS]->delete_value_by_key_kt($wheel, KEY_WID);
  571.   my $ski = $self->[SF_SOCKETS]->fetch_kt($wheel, KEY_WHEELOBJ);
  572.   my $request = $self->[SF_REQ_INDEX]->delete_value_by_key_kt($wheel, KEY_WHEELOBJ);
  573.  
  574.   $self->_ski_remove($ski);
  575.    
  576.   # Discount the use by request key, removing the SF_USED record
  577.   # entirely if it's now moot.
  578.   my $request_key = $request->[RQ_CONN_KEY];
  579.   $self->_decrement_used_each($request_key);
  580.  
  581.   # Tell the requester about the failure.
  582.   _respond_with_error($request, $func, $errnum, $errstr),
  583. }
  584.  
  585. # Connection succeeded.  Remove the SF_WHEELS record corresponding to
  586. # the request.  Flesh out the placeholder SF_USED record so it counts.
  587.  
  588. sub _ka_conn_success {
  589.   my ($self, $socket, $wheel_id) = @_[OBJECT, ARG0, ARG3];
  590.  
  591.   my $wheel = $self->[SF_WHEELS]->delete_value_by_key_kt($wheel_id, KEY_WID);
  592.   my $request = $self->[SF_REQ_INDEX]->delete_value_by_key_kt($wheel, KEY_WHEELOBJ);
  593.   # Remove the SF_WHEELS record.
  594.  
  595.  
  596.  
  597.   if ($request->[RQ_SCHEME] eq 'https') {
  598.     unless ($ssl_available) {
  599.       die "There is no SSL support, please install POE::Component::SSLify";
  600.     }
  601.     eval {
  602.       $socket = POE::Component::SSLify::Client_SSLify($socket);
  603.     };
  604.     if ($@) {
  605.       _respond_with_error($request, "sslify", undef, "$@");
  606.       return;
  607.     }
  608.   }
  609.  
  610.   my $ski = [
  611.     $socket, #SKI_SOCKET,
  612.     $request->[RQ_CONN_KEY], #SKI_KEY,
  613.     time(), #SKI_ATIME,
  614.     undef, #SKI_TIMER
  615.   ];
  616.  
  617.   weaken($ski->[SKI_SOCKET]);
  618.   $self->_ski_mark_used($ski);
  619.    
  620.   DEBUG and warn(
  621.     "CON: posting... to $request->[RQ_SESSION] . $request->[RQ_EVENT]"
  622.   );
  623.  
  624.   # Build a connection object around the socket.
  625.   my $connection = POE::Component::Connection::Keepalive->new(
  626.     socket  => $socket,
  627.     manager => $self,
  628.   );
  629.  
  630.   # Give the socket to the requester.
  631.   _respond(
  632.     $request, {
  633.       connection => $connection,
  634.     }
  635.   );
  636. }
  637.  
  638. # The user is done with a socket.  Make it available for reuse.
  639.  
  640. sub free {
  641.   my ($self, $socket) = @_;
  642.  
  643.  
  644.   return if $self->[SF_SHUTDOWN];
  645.   DEBUG and warn "FREE: freeing socket";
  646.   my $ski = $self->[SF_SOCKETS]->fetch_kt($socket, KEY_SOCKET);
  647.  
  648.   # Remove the accompanying SF_USED record.
  649.   croak "can't free() undefined socket" unless defined $ski;
  650.  
  651.   # Reclaim the socket.
  652.   $poe_kernel->call("$self", "ka_reclaim_socket", $used);
  653.  
  654.   # Avoid returning things by mistake.
  655.   return;
  656. }
  657.  
  658. # A sink for deliberately unhandled events.
  659.  
  660. sub _ka_ignore_this_event {
  661.   # Do nothing.
  662. }
  663.  
  664. # An internal method to fetch a socket from the free pool, if one
  665. # exists.
  666. sub _ski_mark_used {
  667.   my ($self,$ski) = @_;
  668.   my $table = $self->[SF_SOCKETS];
  669.   my $key = $ski->[SKI_KEY];
  670.   $table->delete_attr_from_value(1, ATTR_SOCKET_FREE, $ski);
  671.   $table->delete_attr_from_value($key, ATTR_CONNKEY_FREE, $ski);
  672.  
  673.   $table->store_a(1, ATTR_SOCKET_USED, $ski);
  674.   $table->store_a($key, ATTR_CONNKEY_BUSY);  
  675. }
  676.  
  677. sub _ski_mark_free {
  678.   my ($self,$ski) = @_;
  679.   my $table = $self->[SF_SOCKETS];
  680.   my $key = $ski->[SKI_KEY];
  681.   $table->delete_attr_from_value(1, ATTR_SOCKET_USED, $ski);
  682.   $table->delete_attr_from_value($key, ATTR_CONNKEY_BUSY, $ski);
  683.  
  684.   $table->store_a(1, ATTR_SOCKET_FREE, $ski);
  685.   $table->store_a($key, ATTR_CONNKEY_FREE);  
  686. }
  687.  
  688. sub _check_free_pool {
  689.   my ($self, $conn_key) = @_;
  690.  
  691.   #Get all free sockets for this connection
  692.   my @free = $self->[SF_SOCKETS]->fetch_a($conn_key, ATTR_CONNKEY_FREE);
  693.   return unless @free;
  694.  
  695.   my $ski = shift @free;
  696.  
  697.   #mark as used  
  698.   $self->_ski_mark_used($ski);
  699.  
  700.   DEBUG and warn "CHECK: reusing $conn_key";
  701.  
  702.   # _check_free_pool() may be operating in another session, so we call
  703.   # the correct one here.
  704.  
  705.   $poe_kernel->call("$self", "ka_relinquish_socket", $ski);
  706.  
  707.   $ski->[SKI_ATIME] = time();
  708.   $self->[SF_USED_EACH]{$conn_key}++;
  709.  
  710.     # Build a connection object around the socket.
  711.     my $connection = POE::Component::Connection::Keepalive->new(
  712.       socket  => $ski->[SKI_SOCKET],
  713.       manager => $self,
  714.     );
  715.    
  716.   return $connection;
  717. }
  718.  
  719. sub _decrement_used_each {
  720.   my ($self, $request_key) = @_;
  721.   unless (--$self->[SF_USED_EACH]{$request_key}) {
  722.     delete $self->[SF_USED_EACH]{$request_key};
  723.   }
  724. }
  725.  
  726. # Reclaim a socket.  Put it in the free socket pool, and wrap it with
  727. # select_read() to discard any data and detect when it's closed.
  728.  
  729. sub _ka_reclaim_socket {
  730.   my ($self, $kernel, $ski) = @_[OBJECT, KERNEL, ARG0];
  731.  
  732.   my $socket = $ski->[SKI_SOCKET];
  733.    
  734.   # Decrement the usage counter for the given connection key.
  735.   my $request_key = $ski->[SKI_KEY];
  736.   $self->_decrement_used_each($request_key);
  737.  
  738.   if(!defined fileno $socket) {
  739.     DEBUG and warn "RECLAIM: freed socket has previously been closed";
  740.     $self->_ski_remove($ski);
  741.     goto &_ka_wake_up;
  742.   }
  743.  
  744.   # Socket is still open.  Check for lingering data.
  745.   DEBUG and warn "RECLAIM: checking if socket still works";
  746.  
  747.   # Check for data on the socket, which implies that the server
  748.   # doesn't know we're done.  That leads to desynchroniziation on the
  749.   # protocol level, which strongly implies that we can't reuse the
  750.   # socket.  In this case, we'll make a quick attempt at fetching all
  751.   # the data, then close the socket.
  752.  
  753.   my $rin = '';
  754.   vec($rin, fileno($socket), 1) = 1;
  755.   my ($rout, $eout);
  756.   my $socket_is_active = select ($rout=$rin, undef, $eout=$rin, 0);
  757.  
  758.   if ($socket_is_active) {
  759.     DEBUG and warn "RECLAIM: socket is still active; trying to drain";
  760.     use bytes;
  761.  
  762.     my $socket_had_data = sysread($socket, my $buf = "", 65536) || 0;
  763.     DEBUG and warn "RECLAIM: socket had $socket_had_data bytes. 0 means EOF";
  764.     DEBUG and warn "RECLAIM: Giving up on socket.";
  765.  
  766.     # Avoid common FIN_WAIT_2 issues, but only for valid sockets.
  767.     #if ($socket_had_data and fileno($socket)) {
  768.     if ($socket_had_data) {
  769.       my $opt_result = setsockopt(
  770.         $socket, SOL_SOCKET, SO_LINGER, pack("sll",1,0,0)
  771.       );
  772.       die "setsockopt: " . ($!+0) . " $!" if (not $opt_result and $!  != EBADF);
  773.     }
  774.     $self->_ski_remove($ski);
  775.     goto &_ka_wake_up;
  776.   }
  777.  
  778.   # Socket is alive and has no data, so it's in a quiet, theoretically
  779.   # reclaimable state.
  780.  
  781.   DEBUG and warn "RECLAIM: reclaiming socket";
  782.  
  783.   # Watch the socket, and set a keep-alive timeout.
  784.   $kernel->select_read($socket, "ka_socket_activity");
  785.   my $timer_id = $kernel->delay_set(
  786.     ka_keepalive_timeout => $self->[SF_KEEPALIVE], $ski
  787.   );
  788.  
  789.   $self->_ski_mark_free($ski);
  790.  
  791.   goto &_ka_wake_up;
  792. }
  793.  
  794. # Socket timed out.  Discard it.
  795.  
  796. sub _ka_keepalive_timeout {
  797.   my ($self, $ski) = @_[OBJECT, ARG0];
  798.   $self->_ski_remove($ski);
  799. }
  800.  
  801. # Relinquish a socket.  Stop selecting on it.
  802.  
  803. sub _ka_relinquish_socket {
  804.   my ($kernel, $ski) = @_[KERNEL, ARG0];
  805.   $kernel->alarm_remove($ski->[SKI_TIMER]);
  806.   $kernel->select_read($ski->[SKI_SOCKET], undef);
  807. }
  808.  
  809. # Shut down the component.  Release any sockets we're currently
  810. # holding onto.  Clean up any timers.  Remove the alias it's known by.
  811.  
  812. sub shutdown {
  813.   my $self = shift;
  814.   return if $self->[SF_SHUTDOWN];
  815.   $poe_kernel->call("$self", "ka_shutdown");
  816. }
  817.  
  818. sub _ka_shutdown {
  819.   my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
  820.  
  821.   return if $self->[SF_SHUTDOWN];
  822.  
  823.   $instances--;
  824.  
  825.   # Clean out the request queue.
  826.   foreach my $request (@{$self->[SF_QUEUE]}) {
  827.     $self->_shutdown_request($kernel, $request);
  828.   }
  829.   $self->[SF_QUEUE] = [ ];
  830.  
  831.   # Clean out the socket pool.
  832.  
  833.   #TODO: Implement 'all values' or whatever API function for HR
  834.  
  835.   my @skis = $self->[SF_SOCKETS]->fetch_a(1, ATTR_SOCKET_FREE);
  836.   foreach my $ski (@skis) {
  837.     $self->_ski_remove($ski);
  838.   }
  839.  
  840.   # Stop any pending resolver requests.
  841.   foreach my $host (keys %{$heap->{resolve}}) {
  842.     if ($heap->{resolve}{$host} eq 'cancelled') {
  843.       DEBUG and warn "SHT: Skipping shutdown for $host (already cancelled)";
  844.       next;
  845.     }
  846.     DEBUG and warn "SHT: Shutting down resolver requests for $host";
  847.     foreach my $request (@{$heap->{resolve}{$host}}) {
  848.       $self->_shutdown_request($kernel, $request);
  849.     }
  850.   }
  851.   $heap->{resolve} = { };
  852.  
  853.   # Shut down the resolver.
  854.   DEBUG and warn "SHT: Shutting down resolver";
  855.   if ( $self->[SF_RESOLVER] != $default_resolver ) {
  856.           $self->[SF_RESOLVER]->shutdown();
  857.   }
  858.   $self->[SF_RESOLVER] = undef;
  859.  
  860.   if ( $default_resolver and !$instances ) {
  861.     $default_resolver->shutdown();
  862.     $default_resolver = undef;
  863.   }
  864.  
  865.   # Finish keepalive's shutdown.
  866.   $kernel->alias_remove("$self");
  867.   $self->[SF_SHUTDOWN] = 1;
  868.  
  869.   return;
  870. }
  871.  
  872. sub _shutdown_request {
  873.   my ($self, $kernel, $request) = @_;
  874.   $self->[SF_REQ_INDEX]->delete_value($request);
  875.  
  876.   if (defined $request->[RQ_TIMER_ID]) {
  877.     DEBUG and warn "SHT: Shutting down resolver timer $request->[RQ_TIMER_ID]";
  878.     $kernel->alarm_remove($request->[RQ_TIMER_ID]);
  879.   }
  880.  
  881.   if (defined $request->[RQ_WHEEL_ID]) {
  882.     DEBUG and warn "SHT: Shutting down resolver wheel $request->[RQ_TIMER_ID]";
  883.     delete $self->[SF_WHEELS]{$request->[RQ_WHEEL_ID]};
  884.   }
  885.  
  886.   if (defined $request->[RQ_SESSION]) {
  887.     my $session_id = $request->[RQ_SESSION]->ID;
  888.     DEBUG and warn "SHT: Releasing session $session_id";
  889.     $kernel->refcount_decrement($session_id, "poco-client-keepalive");
  890.   }
  891. }
  892.  
  893. # A socket in the free pool has activity.  Read from it and discard
  894. # the output.  Discard the socket on error or remote closure.
  895.  
  896. sub _ka_socket_activity {
  897.   my ($self, $kernel, $socket) = @_[OBJECT, KERNEL, ARG0];
  898.   my $ski = $self->[SF_SOCKETS]->fetch_kt($socket, KEY_SOCKET);
  899.    
  900.   if (DEBUG) {
  901.     my $key = $ski->[SKI_KEY];
  902.     warn "CON: Got activity on socket for $key";
  903.   }
  904.  
  905.   # Any socket activity on a kept-alive socket implies that the socket
  906.   # is no longer reusable.
  907.  
  908.   use bytes;
  909.   my $socket_had_data = sysread($socket, my $buf = "", 65536) || 0;
  910.   DEBUG and warn "CON: socket had $socket_had_data bytes. 0 means EOF";
  911.   DEBUG and warn "CON: Removing socket from the pool";
  912.   $self->_ski_remove($ski);  
  913. }
  914.  
  915. sub _ka_resolve_request {
  916.   my ($self, $kernel, $heap, $request) = @_[OBJECT, KERNEL, HEAP, ARG0];
  917.  
  918.   my $host = $request->[RQ_ADDRESS];
  919.  
  920.   # Skip DNS resolution if it's already a dotted quad.
  921.   # ip_is_ipv4() doesn't require quads, so we count the dots.
  922.   #
  923.   # TODO - Do the same for IPv6 addresses containing colons?
  924.   # TODO - Would require AF_INET6 support around the SocketFactory.
  925.   if ((($host =~ tr[.][.]) == 3) and ip_is_ipv4($host)) {
  926.     DEBUG_DNS and warn "DNS: $host is a dotted quad; skipping lookup";
  927.     $kernel->call("$self", ka_add_to_queue => $request);
  928.     return;
  929.   }
  930.  
  931.   # It's already pending DNS resolution.  Combine this with previous.
  932.   if (exists $heap->{resolve}->{$host}) {
  933.     DEBUG_DNS and warn "DNS: $host is piggybacking on a pending lookup.\n";
  934.     push @{$heap->{resolve}->{$host}}, $request;
  935.     return;
  936.   }
  937.  
  938.   # New request.  Start lookup.
  939.   $heap->{resolve}->{$host} = [ $request ];
  940.  
  941.   my $response = $self->[SF_RESOLVER]->resolve(
  942.     event   => 'ka_dns_response',
  943.     host    => $host,
  944.     service => $request->[RQ_SCHEME],
  945.   );
  946.  
  947.   DEBUG_DNS and warn "DNS: looking up $host in the background.\n";
  948. }
  949.  
  950. sub _ka_dns_response {
  951.   my ($self, $kernel, $heap, $response_error, $addresses, $request) = @_[
  952.     OBJECT, KERNEL, HEAP, ARG0..ARG2
  953.   ];
  954.  
  955.   # We've shut down.  Nothing to do here.
  956.   return if $self->[SF_SHUTDOWN];
  957.  
  958.   my $request_address = $request->{host};
  959.   my $requests = delete $heap->{resolve}->{$request_address};
  960.  
  961.   DEBUG_DNS and warn "DNS: got response for request address $request_address";
  962.  
  963.   # Requests on record.
  964.   if (defined $requests) {
  965.     # We can receive responses for canceled requests.  Ignore them: we
  966.     # cannot cancel PoCo::Client::DNS requests, so this is how we reap
  967.     # them when they're canceled.
  968.     if ($requests eq 'cancelled') {
  969.       DEBUG_DNS and warn "DNS: reaping cancelled request for $request_address";
  970.       return;
  971.     }
  972.     unless (ref $requests eq 'ARRAY') {
  973.       die "DNS: got an unknown requests for $request_address: $requests";
  974.     }
  975.   }
  976.   else {
  977.     die "DNS: Unexpectedly undefined requests for $request_address";
  978.   }
  979.  
  980.   # This is an error.  Cancel all requests for the address.
  981.   # Tell everybody that their requests failed.
  982.   if ($response_error) {
  983.     DEBUG_DNS and warn "DNS: resolver error = $response_error";
  984.     foreach my $request (@$requests) {
  985.       _respond_with_error($request, "resolve", undef, $response_error),
  986.     }
  987.     return;
  988.   }
  989.  
  990.   DEBUG_DNS and warn "DNS: got a response";
  991.  
  992.   # A response!
  993.   foreach my $address_rec (@$addresses) {
  994.     my $numeric = $self->[SF_RESOLVER]->unpack_addr($address_rec);
  995.  
  996.     DEBUG_DNS and warn "DNS: $request_address resolves to $numeric";
  997.  
  998.     foreach my $request (@$requests) {
  999.       # Don't bother continuing inactive requests.
  1000.       next unless $request->[RQ_ACTIVE];
  1001.       $request->[RQ_IP] = $numeric;
  1002.       $request->[RQ_ADDR_FAM] = $address_rec->{family};
  1003.       $kernel->yield(ka_add_to_queue => $request);
  1004.     }
  1005.  
  1006.     # Return after the first good answer.
  1007.     return;
  1008.   }
  1009.  
  1010.   # Didn't return here.  No address record for the host?
  1011.   foreach my $request (@$requests) {
  1012.     DEBUG_DNS and warn "DNS: $request_address does not resolve";
  1013.     _respond_with_error($request, "resolve", undef, "Host has no address."),
  1014.   }
  1015. }
  1016.  
  1017.  
  1018. sub _ka_add_to_queue {
  1019.   my ($self, $kernel, $request) = @_[OBJECT, KERNEL, ARG0];
  1020.  
  1021.   push @{ $self->[SF_QUEUE] }, $request;
  1022.  
  1023.   # If the queue has more than one request in it, then it already has
  1024.   # a wakeup event pending.  We don't need to send another one.
  1025.  
  1026.   return if @{$self->[SF_QUEUE]} > 1;
  1027.  
  1028.   # If the component's allocated socket count is maxed out, then it
  1029.   # will check the queue when an existing socket is released.  We
  1030.   # don't need to wake it up here.
  1031.   my $use_count = scalar $self->[SF_SOCKETS]->fetch_a(1, ATTR_SOCKET_USED);
  1032.  
  1033.   return if $use_count >= $self->[SF_MAX_OPEN];
  1034.  
  1035.   # Likewise, we shouldn't awaken the session if there are no
  1036.   # available slots for the given scheme/address/port triple.  "|| 0"
  1037.   # to avoid an undef error.
  1038.  
  1039.   my $conn_key = $request->[RQ_CONN_KEY];
  1040.   return if (
  1041.     ($self->[SF_USED_EACH]{$conn_key} || 0) >= $self->[SF_MAX_HOST]
  1042.   );
  1043.  
  1044.   # Wake the session up, and return nothing, signifying sound and fury
  1045.   # yet to come.
  1046.   DEBUG and warn "posting wakeup for $conn_key";
  1047.   $poe_kernel->post("$self", "ka_wake_up");
  1048.   return;
  1049. }
  1050.  
  1051. # Remove a socket from the free pool, by the socket handle itself.
  1052. sub _ski_remove {
  1053.   my ($self,$ski) = @_;
  1054.   $self->[SF_SOCKETS]->delete_by_value($ski);
  1055.   $poe_kernel->alarm_remove($ski->[SKI_TIMER]);
  1056.   $poe_kernel->select_read($ski->[SKI_SOCKET], undef);
  1057. }
  1058.  
  1059. sub _remove_socket_from_pool {
  1060.   my ($self, $socket) = @_;
  1061.   my $ski = $self->[SF_SOCKETS]->fetch_kt($socket, KEY_SOCKET);
  1062.   $self->_ski_remove($ski);
  1063.  
  1064.   # Avoid common FIN_WAIT_2 issues.
  1065.   # Commented out because fileno() will return true for closed
  1066.   # sockets, which makes setsockopt() highly unhappy.  Also, SO_LINGER
  1067.   # will cause te socket closure to block, which is less than ideal.
  1068.   # We need to revisit this another way, or just let sockets enter
  1069.   # FIN_WAIT_2.
  1070.  
  1071. #  if (fileno $socket) {
  1072. #    setsockopt($socket, SOL_SOCKET, SO_LINGER, pack("sll",1,0,0)) or die(
  1073. #      "setsockopt: $!"
  1074. #    );
  1075. #  }
  1076. }
  1077.  
  1078. # Internal function.  NOT AN EVENT HANDLER.
  1079.  
  1080. sub _respond_with_error {
  1081.   my ($request, $func, $num, $string) = @_;
  1082.   _respond(
  1083.     $request,
  1084.     {
  1085.       connection => undef,
  1086.       function   => $func,
  1087.       error_num  => $num,
  1088.       error_str  => $string,
  1089.     }
  1090.   );
  1091. }
  1092.  
  1093. sub _respond {
  1094.   my ($request, $fields) = @_;
  1095.  
  1096.   # Bail out early if the request isn't active.
  1097.   return unless $request->[RQ_ACTIVE] and $request->[RQ_SESSION];
  1098.  
  1099.   $poe_kernel->post(
  1100.     $request->[RQ_SESSION],
  1101.     $request->[RQ_EVENT],
  1102.     {
  1103.       addr        => $request->[RQ_ADDRESS],
  1104.       context     => $request->[RQ_CONTEXT],
  1105.       port        => $request->[RQ_PORT],
  1106.       scheme      => $request->[RQ_SCHEME],
  1107.       for_addr    => $request->[RQ_FOR_ADDRESS],
  1108.       for_scheme  => $request->[RQ_FOR_SCHEME],
  1109.       for_port    => $request->[RQ_FOR_PORT],
  1110.       %$fields,
  1111.     }
  1112.   );
  1113.  
  1114.   # Drop the extra refcount.
  1115.   $poe_kernel->refcount_decrement(
  1116.     $request->[RQ_SESSION]->ID(),
  1117.     "poco-client-keepalive"
  1118.   );
  1119.  
  1120.   # Remove associated timer.
  1121.   if ($request->[RQ_TIMER_ID]) {
  1122.     $poe_kernel->alarm_remove($request->[RQ_TIMER_ID]);
  1123.     $request->[RQ_TIMER_ID] = undef;
  1124.   }
  1125.  
  1126.   # Deactivate the request.
  1127.   $request->[RQ_ACTIVE] = undef;
  1128. }
  1129.  
  1130. 1;