From b0f8064d0d6a747aac5b45bc44c3c4abda7873ad Mon Sep 17 00:00:00 2001
From: sunshine <sunshine>
Date: Wed, 4 Aug 2004 14:35:12 +0000
Subject: [PATCH] Add unit test

---
 apps/sam/python/src/i2p/test/test_select.py | 113 ++++++++++++++++++++
 1 file changed, 113 insertions(+)
 create mode 100644 apps/sam/python/src/i2p/test/test_select.py

diff --git a/apps/sam/python/src/i2p/test/test_select.py b/apps/sam/python/src/i2p/test/test_select.py
new file mode 100644
index 0000000000..c6e6c3637d
--- /dev/null
+++ b/apps/sam/python/src/i2p/test/test_select.py
@@ -0,0 +1,113 @@
+
+# -----------------------------------------------------
+# test_select.py: Unit tests for select.py.
+# -----------------------------------------------------
+
+# Make sure we can import i2p
+import sys; sys.path += ['../../']
+
+import time
+
+import traceback, sys
+from i2p import socket, select
+import i2p.socket
+import socket as pysocket
+
+def minitest_select(rans, wans, eans, timeout,
+                    f1=None, f4=None, c1=None, c4=None):
+  """Mini-unit test for select (Python and I2P sockets).
+     Calls f1() on socket S1, f4() on socket S4, uses select()
+     timeout 'timeout'.  rans, wans, and eans should be lists
+     containing indexes 1...6 of the sockets defined below.  The
+     result of i2p.select.select() will be verified against these
+     lists.  After this, calls c1() on S1, and c4() on S4."""
+  S1 = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_STREAM)
+  S2 = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_DGRAM)
+  S3 = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_RAW)
+
+  kw = {'in_depth':0, 'out_depth':0}
+  S4 = socket.socket('Fella', socket.SOCK_STREAM, **kw)
+  S5 = socket.socket('Boar',  socket.SOCK_DGRAM,  **kw)
+  S6 = socket.socket('Gehka', socket.SOCK_RAW,    **kw)
+
+  if f1: f1(S1)
+  if f4: f4(S4)
+
+  L = [S1, S2, S3, S4, S5, S6]
+
+  start = time.time()
+  ans  = select.select(L,  L,  L,  timeout)
+  ans1 = select.select(L,  [], [], timeout)
+  ans2 = select.select([], L,  [], timeout)
+  ans3 = select.select([], [], L,  timeout)
+  end = time.time()
+  T = end - start
+
+  ans  = [[L.index(x) + 1 for x in ans [i]] for i in range(3)]
+  ans1 = [[L.index(x) + 1 for x in ans1[i]] for i in range(3)]
+  ans2 = [[L.index(x) + 1 for x in ans2[i]] for i in range(3)]
+  ans3 = [[L.index(x) + 1 for x in ans3[i]] for i in range(3)]
+
+  print ans1[0], rans
+  assert ans1[0] == rans
+  print ans2[1], wans
+  assert ans2[1] == wans
+  print ans3[2], eans
+  assert ans3[2] == eans
+  print ans, [rans, wans, eans]
+  assert ans  == [rans, wans, eans]
+  assert T < 4 * timeout + 0.1
+
+  if c1: c1(S1)
+  if c4: c4(S4)
+
+def test_select():
+  """Unit test for select (Python and I2P sockets)."""
+
+  def connect1(S):
+    """Connect regular Python socket to Google."""
+    ip = pysocket.gethostbyname('www.google.com')
+    S.connect((ip, 80))
+
+  def connect4(S):
+    """Connect I2P Python socket to duck.i2p."""
+    S.connect('duck.i2p')
+
+  def full1(S):
+    """Connect regular Python socket to Google, and send."""
+    connect1(S)
+    S.sendall('GET / HTTP/1.0\r\n\r\n')
+    print S.recv(1)
+
+  def full4(S):
+    """Connect I2P Python socket to duck.i2p, and send."""
+    connect4(S)
+    S.sendall('GET / HTTP/1.0\r\n\r\n')
+    S.recv(1)
+    # Peek twice (make sure peek code isn't causing problems).
+    S.recv(1, i2p.socket.MSG_PEEK | i2p.socket.MSG_DONTWAIT)
+    S.recv(1, i2p.socket.MSG_PEEK | i2p.socket.MSG_DONTWAIT)
+  
+  def check(S):
+    """Verify that three chars recv()d are 'TTP'."""
+    assert S.recv(3) == 'TTP'
+
+  try:
+    for t in [0.0, 1.0]:
+      minitest_select([], [2, 3, 5, 6], [], t)
+      minitest_select([], [1, 2, 3, 4, 5, 6], [], t,
+                      f1=connect1, f4=connect4)
+      minitest_select([], [1, 2, 3, 5, 6], [], t,
+                      f1=connect1)
+      minitest_select([], [2, 3, 4, 5, 6], [], t,
+                      f4=connect4)
+      minitest_select([1, 4], [1, 2, 3, 4, 5, 6], [], t,
+                      f1=full1, f4=full4, c1=check, c4=check)
+  except:
+    print 'Unit test failed for i2p.select.select().'
+    traceback.print_exc(); sys.exit()
+  print 'i2p.select.select():      OK'
+
+
+if __name__ == '__main__':
+  test_select()
-- 
GitLab