Merge remote branch 'origin/demonstration'
[deska.git] / tests / cli-interaction.py
blob87b4a5e4b2064e8395ce3111a76b20604739b19f
1 import sys
2 import subprocess
3 import select
4 import os
5 import unittest
7 # enable "import libLowLevelPyDeska"
8 path_libLowLevelPyDeska = os.path.abspath(os.getcwd() + "../../..")
9 sys.path.append(path_libLowLevelPyDeska)
10 # enable "import deska"
11 path_deska = os.path.abspath(os.environ["DESKA_SOURCES"] + "/src/deska/python")
12 sys.path.append(path_deska)
13 path_deska_server_bin = os.path.abspath(os.environ["DESKA_SOURCES"] +
14 "/src/deska/server/app/deska-server")
16 class TestCliInteraction(unittest.TestCase):
17 def setUp(self):
18 DBNAME = os.environ["DESKA_DB"]
19 DBUSER = os.environ["DESKA_USER"]
20 TESTCASE = os.environ["DESKA_TESTCASE"]
21 CFGGEN_EXTRA_OPTIONS = []
22 try:
23 CFGGEN_METHOD = os.environ["DESKA_CFGGEN_BACKEND"]
24 except KeyError:
25 CFGGEN_METHOD = "fake"
26 if CFGGEN_METHOD == "git":
27 CFGGEN_EXTRA_OPTIONS = ["--cfggen-script-path", os.environ["DESKA_CFGGEN_SCRIPTS"],
28 "--cfggen-git-repository", os.environ["DESKA_CFGGEN_GIT_PRIMARY_CLONE"],
29 "--cfggen-git-workdir", os.environ["DESKA_CFGGEN_GIT_WC"]]
30 # massage the PYTHONPATH for the children
31 if not os.environ.has_key("PYTHONPATH"):
32 os.environ["PYTHONPATH"] = ":".join([path_libLowLevelPyDeska, path_deska])
33 else:
34 os.environ["PYTHONPATH"] = ":".join([os.environ["PYTHONPATH"], path_libLowLevelPyDeska, path_deska])
36 self.tdata = file("%s/%s" % (os.path.abspath(os.environ["DESKA_SOURCES"] + "/tests/cli-interaction/"), TESTCASE), "rb")
37 cmd = ["../../deska-cli", "--DBConnection.Server", path_deska_server_bin, "--CLI.HistoryFilename", "/dev/null",
38 "--execute", "-"]
39 self.p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
41 def expect_stdin(self, expected_rdata):
42 readSoFar = ""
43 while True:
44 status = select.select((self.p.stdout, self.p.stderr), (), (), 3)
45 if (self.p.stderr in status[0]):
46 err = os.read(self.p.stderr.fileno(), 65536)
47 raise RuntimeError, "stderr: %s" % err
48 elif status[0] == []:
49 print "Read so far (%d): '%s'" % (len(readSoFar), readSoFar)
50 print "Expected to read (%d): '%s'" % (len(expected_rdata), expected_rdata)
51 raise RuntimeError, "tester: select() timed out, CLI stuck?"
52 else:
53 toRead = len(expected_rdata) - len(readSoFar)
54 self.assertTrue(toRead > 0)
55 readSoFar = readSoFar + os.read(self.p.stdout.fileno(), toRead)
56 if len(readSoFar) == len(expected_rdata):
57 print "R %s" % readSoFar
58 self.assertEqual(readSoFar, expected_rdata)
59 break
60 else:
61 continue
63 def test_interaction(self):
64 mode = "io"
65 for line in self.tdata:
66 if mode == "io":
67 if line.startswith("#") or line == "\n":
68 continue
69 elif line.startswith("w "):
70 print line
71 self.p.stdin.write(line[2:])
72 self.p.stdin.flush()
73 elif line.startswith("r "):
74 self.expect_stdin(line[2:])
75 elif line == "<dump>\n":
76 print "<dump>"
77 mode = "dump"
78 self.p.stdin.write("dump\n")
79 self.p.stdin.flush()
80 else:
81 raise RuntimeError("Unexpected test data in IO mode: %s" % line)
82 elif mode == "dump":
83 if line == "</dump>\n":
84 print "</dump>"
85 mode = "io"
86 else:
87 self.expect_stdin(line)
88 else:
89 r.assertFalse("Invalid state: %s" % mode)
90 self.p.stdin.close()
91 self.expect_stdin("All commands successfully executed.")
92 print "Everything went OK"
94 if __name__ == "__main__":
95 unittest.main()