From 2df4370477990425581a9e08659b2c8ccf73a09f Mon Sep 17 00:00:00 2001
From: brianr <brianr>
Date: Wed, 19 May 2004 01:26:02 +0000
Subject: [PATCH] Some changes to make the SAM module never block if called on
 a socket which select() says is safe to read/write or called in any case on a
 socket which is O_NONBLOCK

Significant work is still required.
---
 apps/sam/perl/Net/SAM.pm | 242 ++++++++++++++++++++++++++++++---------
 1 file changed, 187 insertions(+), 55 deletions(-)

diff --git a/apps/sam/perl/Net/SAM.pm b/apps/sam/perl/Net/SAM.pm
index 3507e82cdf..109330e1bb 100644
--- a/apps/sam/perl/Net/SAM.pm
+++ b/apps/sam/perl/Net/SAM.pm
@@ -10,6 +10,8 @@ package Net::SAM;
 
 use strict;
 
+use POSIX;
+
 use Switch;
 
 use IO::Socket;
@@ -73,71 +75,199 @@ sub lookup {
 
 #}
 
-sub readprocess {    
+
+sub readprocesswrite {
     my $self = shift;
-    my $chunk;
-    my $payload; 
+    $self->readprocess();
+    $self->dowrite();
+}
 
-    print "readprocess: " . $self->connected() . "\n";
+sub doread {
+    my $self = shift;
+    my $rv;
+    my $data;
+    
+    $rv = $self->recv($data, $POSIX::BUFSIZE, 0);
 
-    # May block if the SAM bridge gets hosed
-    my $response = <$self>;
+    if ( defined($rv) && ( length($data) >= 1 ) ) {
+	# We received some data. Put it in our buffer.
+	${*$self}->{inbuffer} += $data;
+    } else {
+	# No data. Either we're on a non-blocking socket, or there 
+	# was an error or EOF
+	if ( $!{EAGAIN} ) {
+	    return 1;
+	} else {
+	    # I suppose caller can look at $! for details
+	    return undef; 
+	}
+    }
+}
 
-    print "readprocess: $!" . $self->connected() . "\n";
 
-    chomp $response;
-    my ($primative, $more, $extra) = split (' ', $response, 3);
+sub dowrite {
+    my $self = shift;
+    my $rv;
+    my $data; 
 
-    $primative = uc($primative);
+    $rv = $self->send(${*$self}->{outbuffer}, 0);
+    
+    if ( ! defined($rv) ) {
+	warn "SAM::dowrite - Couldn't write for no apparent reason.\n";
+	return undef; 
+    }
 
-    print "readprocess: " . $self->connected() . " -- $primative -- $more -- $extra\n";
+    if ( $rv == length(${*$self}->{outbuffer}) || $!{EWOULDBLOCK} ) {
+	substr(${*$self}->{outbuffer},0, $rv) = ''; # Remove from buffer
 
-    switch ($primative) {
+	# Nuke buffer if empty
+	delete ${*$self}->{outbuffer} unless length(${*$self}->{outbuffer});
+    } else {
+	# Socket closed on us or something?
+	return undef;
+    }
+}
+
+sub messages {
+    my $self = shift;
+    
+    return @{ ${*$self}->{messages} };
+}
 
-	case "HELLO" {
-	    if ($more !~ m/REPLY/ ) { die ("Bogus HELLO response") }
-	    if ($extra =~ m/NOVERSION/ ) { 
-		die("SAM Bridge Doesn't support my version") ;
-	    }
-	    $self->_hashtv($extra);
-	    ${*$self}->{greeted} = 1;
-	};
-	case "SESSION" {
-	    if ( $more !~ m/STATUS/ ) {
-		die("Bogus SESSION response");
-	    }
-	    $self->_hashtv($extra);
+sub queuemessage {
+
+    my $self = shift;
+    my $message = shift; 
+    
+    push @{ ${*$self}->{messages} } , $message;
+}
+
+sub unqueuemessage {
+    my $self = shift;
+    
+    return unshift(@{ ${*$self}->{messages} } );
+    
+}
+
+sub readprocess {
+    my $self = shift;
+
+    $self->doread();
+    $self->process();
+}
+
+sub process {    
+    my $self = shift;
+    my %tvhash; 
+    my $payload; 
+    
+
+    # Before we can read any new messages, if an existing message has payload
+    # we must read it in. Otherwise we'll create garbage messages containing
+    # the payload of previous messages. 
+
+    if ( ${*$self}->{payloadrequired} >= 1 ) {
+
+	if ( length( ${*$self}->{inbuffer} ) >= ${*$self}->{payloadrequired} ) {
+	    # Scarf payload from inbuffer into $payload
+	    $payload = substr(${*$self}->{inbuffer}, 0, 
+			      ${*$self}->{payloadrequired});
+	    
+	    # Nuke payload from inbuffer
+	    substr(${*$self}->{inbuffer}, 0,
+		   ${*$self}->{payloadrequired} ) = '';
+	    
+	    # Put message with payload into spool
+	    push @{ ${*$self}->{messages} } , 
+	    ${*$self}->{messagerequiringpayload}.$payload;
+
+	    # Delete the saved message requiring payload
+	    delete ${*$self}->{messagerequiringpayload};
+	} else {
+	    # Insufficient payload in inbuffer. Try again later. 
+	    return 1;
+	}
+
+    }
+
+
+    if ( ${*$self}->{inbuffer} =~ s/(.*\n)// ) {
+	%tvhash = $self->_hashtv($1); # Returns a tag/value hash
+	if ( $tvhash{SIZE} ) {
+	    # We've got a message with payload on our hands. :(
+	    ${*$self}->{payloadrequired} = $tvhash{SIZE}; 
+	    ${*$self}->{messagerequiringpayload} = $1; 
+	    return 1; # Could call ourself here, but we'll get called again. 
+	} else {
+	    push @{ ${*$self}->{messages} } , $1;
 	}
-	case "STREAM" {};
-	case "DATAGRAM" {
-	    if ( $more !~ m/RECEIVE/ ) {
-		die("Bogus DATAGRAM response.");
-	    }
-	    $self->_hashtv($extra);
-	    push @{ ${*$self}->{incomingdatagram } }, 
-		    [ ${*$self}->{DESTINATION},
-		      $self->_readblock(${*$self}->{SIZE}) ];
-		      
-	};
-	case "RAW" {
-	    if ( $more !~ m/RECEIVE/ ) {
-		die("Bogus RAW response.");
-	    }
-	    $self->_hashtv($extra);
-
-	    push @{ $self->{incomingraw} }, $self->_readblock($self->{SIZE});
-	};
-	case "NAMING" {
-	    if ( $more !~ m/REPLY/ ) {
-		die("Bogus NAMING response");
-	    }
-	    $self->_hashtv($extra);
-	};
-	case "DEST" {};
     }
     return 1; 
 }
 
+# sub junk {
+
+
+#     print "readprocess: " . $self->connected() . "\n";
+
+#     # May block if the SAM bridge gets hosed
+#     my $response = <$self>;
+
+#     print "readprocess: $!" . $self->connected() . "\n";
+
+#     chomp $response;
+#     my ($primative, $more, $extra) = split (' ', $response, 3);
+
+#     $primative = uc($primative);
+
+#     print "readprocess: " . $self->connected() . " -- $primative -- $more -- $extra\n";
+
+#     switch ($primative) {
+
+# 	case "HELLO" {
+# 	    if ($more !~ m/REPLY/ ) { die ("Bogus HELLO response") }
+# 	    if ($extra =~ m/NOVERSION/ ) { 
+# 		die("SAM Bridge Doesn't support my version") ;
+# 	    }
+# 	    $self->_hashtv($extra);
+# 	    ${*$self}->{greeted} = 1;
+# 	};
+# 	case "SESSION" {
+# 	    if ( $more !~ m/STATUS/ ) {
+# 		die("Bogus SESSION response");
+# 	    }
+# 	    $self->_hashtv($extra);
+# 	}
+# 	case "STREAM" {};
+# 	case "DATAGRAM" {
+# 	    if ( $more !~ m/RECEIVE/ ) {
+# 		die("Bogus DATAGRAM response.");
+# 	    }
+# 	    $self->_hashtv($extra);
+# 	    push @{ ${*$self}->{incomingdatagram } }, 
+# 		    [ ${*$self}->{DESTINATION},
+# 		      $self->_readblock(${*$self}->{SIZE}) ];
+		      
+# 	};
+# 	case "RAW" {
+# 	    if ( $more !~ m/RECEIVE/ ) {
+# 		die("Bogus RAW response.");
+# 	    }
+# 	    $self->_hashtv($extra);
+
+# 	    push @{ $self->{incomingraw} }, $self->_readblock($self->{SIZE});
+# 	};
+# 	case "NAMING" {
+# 	    if ( $more !~ m/REPLY/ ) {
+# 		die("Bogus NAMING response");
+# 	    }
+# 	    $self->_hashtv($extra);
+# 	};
+# 	case "DEST" {};
+#     }
+#     return 1; 
+# }
+
 sub getfh {
     # Return the FH of the SAM socket so apps can select() or poll() on it
     my $self = shift;
@@ -161,12 +291,14 @@ sub _readblock {
 
 sub _hashtv {
     my $self = shift;
-    my $extra = shift;
+    my $tvstring = shift;
+    my $tvhash;
 
-    while ( $extra=~ m/(\S+)=(\S+)/sg ) {
-	${*$self}->{$1}=$2;
-	print "$1=$2\n"
+    while ( $tvstring =~ m/(\S+)=(\S+)/sg ) {
+	$tvhash->{$1}=$2;
+	print "hashtv: $1=$2\n"
     }
+    return $tvhash;
 }
 
 sub DESTROY {
-- 
GitLab