Skip to content

Commit 2e99411

Browse files
committed
enable control of services initiated by the monitor: receiveFromNodeStandardOutput and sendToNodeStandardInput
1 parent f224a90 commit 2e99411

File tree

1 file changed

+160
-27
lines changed

1 file changed

+160
-27
lines changed

app/server/Transient/Move/Services/MonitorService.hs

Lines changed: 160 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import Control.Exception(SomeException(..))
2626
import Control.Concurrent
2727
import Control.Monad
2828
import Data.List
29+
import System.IO
2930
import System.Process
3031
import System.Directory
3132
import Data.Monoid
@@ -34,21 +35,31 @@ import System.IO.Unsafe
3435
import Data.IORef
3536
import qualified Data.Map as M
3637
import GHC.Conc
38+
import Data.Maybe(fromMaybe)
39+
import Control.Exception
40+
import qualified Data.ByteString.Lazy.Char8 as BS
41+
import qualified Data.ByteString.Char8 as BSS
3742

3843

3944

4045

4146
main = do
4247
putStrLn "Starting Transient monitor"
43-
keep' $ runService monitorService 3000
48+
keep $ runService monitorService 3000
4449

4550
[serve receiveStatus
4651
,serve returnInstances
47-
,serve reReturnInstances]
52+
,serve reReturnInstances
53+
54+
,serve receiveFromNodeStandardOutputIt
55+
,serve sendToNodeStandardInputIt
56+
,serve getLogIt
57+
]
4858
empty
4959

5060

51-
61+
{- ping is not used to determine healt of services. The client program notify the
62+
monitor when a service fails, with reInitService.
5263
pings = do
5364
5465
localIO $ print $ "INITIATING PINGSSSSSSSSSSSSSSSSSSSSSSS"
@@ -61,11 +72,12 @@ pings = do
6172
6273
local $ threads 1 $ runCloud $ mapM ping $ tail nodes
6374
empty
64-
75+
-}
6576

6677

6778
type Port= Int
6879

80+
-- | receive a status from an executable.
6981
receiveStatus :: (Port, String) -> Cloud ()
7082
receiveStatus (port, logLine)= do
7183
localIO $ appendFile ("log"++ show port) $ logLine++"\n"
@@ -90,14 +102,18 @@ withBlockingService serv proc= do
90102
localIO $ atomicModifyIORef blockings $ \map -> (M.delete serv map,())
91103
return r
92104

105+
-- | gets a node with a service, which probably failed and return other n instances of the same service.
106+
-- This is used to implement failover.
93107
reReturnInstances :: (String, Node, Int) -> Cloud [Node]
94108
reReturnInstances (ident, node, num)= do
95109
local $ delNodes [node]
96110
returnInstances (ident, nodeServices node, num)
97111

112+
-- | install and return n instances of a service, distributed
113+
-- among all the nodes which have monitoService executables running and connected
98114
returnInstances :: (String, Service, Int) -> Cloud [Node]
99115
returnInstances (ident, service, num)= withBlockingService service $ do
100-
116+
return () !> "RETURNINSTANCES"
101117
nodes <- local $ findInNodes service >>= return . take num
102118

103119
let n= num - length nodes
@@ -117,29 +133,29 @@ returnInstances (ident, service, num)= withBlockingService service $ do
117133
lacking= num `rem` nnodes
118134
(nodes1,nodes2)= splitAt lacking nodes
119135
return () !> (pernode,lacking,nodes1,nodes2)
120-
rs <- callNodes' nodes1 (<>) mempty (installHere ident service (pernode+1)) <>
121-
callNodes' nodes2 (<>) mempty (installHere ident service pernode)
136+
rs <- callNodes' nodes1 (<>) mempty (installHere service (pernode+1)) <>
137+
callNodes' nodes2 (<>) mempty (installHere service pernode)
122138
local $ addNodes rs
123139
ns <- onAll getNodes
124140

125141
return rs !> ("MONITOR RETURN---------------------------------->", rs)
126142

127-
-- installIt = installHere ident service <|> installThere ident service
128-
installHere :: String -> Service -> Int -> Cloud [ Node]
129-
installHere ident service n= local $ replicateM n installOne
143+
-- installIt = installHere service <|> installThere service
144+
installHere :: Service -> Int -> Cloud [ Node]
145+
installHere service n= local $ replicateM n installOne
130146
where
131147
installOne= do
132148
port <- liftIO freePort
133149
install service port
134150
return () !> "INSTALLED"
135151

136152
thisNode <- getMyNode
137-
let node= Node (nodeHost thisNode) port Nothing service -- node to be published
138-
nodelocal= Node "localhost" port Nothing [("externalNode", show $ node{nodeServices=[]})] -- local node
139-
addNodes [node] -- {nodeServices=("localNode", show nodelocal{nodeServices=[]}):nodeServices node},nodelocal ]
140-
return node -- {nodeServices= nodeServices node ++ [("relay",show thisNode{nodeServices=[]})]}
153+
let node= Node (nodeHost thisNode) port Nothing (service ++ relayinfo thisNode) -- node to be published
154+
addNodes [node]
155+
return node
141156
`catcht` \(e :: SomeException) -> liftIO (putStr "INSTALLLLLLLLLLLLLLL2222222: " >> print e) >> empty
142-
157+
158+
relayinfo mon= if nodeHost mon /= "localhost" then [("relay",show(nodeHost mon,nodePort mon))] else []
143159

144160

145161

@@ -150,10 +166,12 @@ install :: Service -> Int -> TransIO ()
150166

151167
install service port= do
152168
-- return () !> "IIIIIIIIIIIIIIINSTALL"
169+
153170
install' `catcht` \(e :: SomeException) -> liftIO (putStr "INSTALL error: " >> print e) >> empty
154171
where
155172
install'= do
156-
let host= "localhost"
173+
my <- getMyNode
174+
let host= nodeHost my
157175
program <- return (lookup "executable" service) `onNothing` empty
158176
-- return () !> ("program",program)
159177
tryExec program host port <|> tryDocker service host port program
@@ -174,28 +192,27 @@ tryInstall service = do
174192
tryDocker service host port program= do
175193
image <- emptyIfNothing $ lookup "image" service
176194
path <- Transient $ liftIO $ findExecutable "docker" -- return empty if not found
177-
liftIO $ callProcess path ["run", image,"-p"," start/"++host++"/"++ show port++ " " ++ program]
195+
liftIO $ callProcess path ["run", image,"-p"," start/"++ host++"/"++ show port++ " " ++ program]
178196

179197

180198
tryExec program host port= do
181-
path <- Transient $ liftIO $ findExecutable program -- !> ("findExecutable", program)
199+
path <- Transient $ liftIO $ findExecutable program -- would abandon (empty) if the executable is not found
182200
spawnProgram program host port -- !>"spawn"
183201
where
184-
spawnProgram program host port= liftIO $ do
202+
spawnProgram program host port= do
185203

186204
let prog = pathExe program host port
187-
putStr "executing: " >> putStrLn prog
188-
let createprostruct= shell prog
189-
createProcess $ createprostruct ; return ()
205+
liftIO $ putStr "executing: " >> putStrLn prog
190206

191-
threadDelay 2000000
207+
(networkExecuteStreamIt prog >> empty) <|> return () !> "INSTALLING"
208+
liftIO $ threadDelay 2000000
192209

193-
-- return() !> ("INSTALLED", program)
210+
return() !> ("INSTALLED", program)
194211
where
195212

196-
pathExe program host port=
197-
program ++ " -p start/" ++ show (host ::String)
198-
++"/" ++ show (port ::Int) ++ " > "++ program ++ host ++ show port ++ ".log 2>&1"
213+
pathExe program host port=
214+
program ++ " -p start/" ++ (host ::String)
215+
++"/" ++ show (port ::Int) -- ++ " > "++ program ++ host ++ show port ++ ".log 2>&1"
199216

200217

201218

@@ -220,3 +237,119 @@ installGit package = liftIO $ do
220237
tail1 []=[]
221238
tail1 x= tail x
222239

240+
241+
-------------------------execution ----------------------------
242+
243+
getLogIt :: GetLog -> Cloud BS.ByteString
244+
getLogIt (GetLog node)= do
245+
let program = fromMaybe (error "no Executable in service "++ show (nodeServices node)) $
246+
lookup "executable" (nodeServices node)
247+
let expr = pathExe program (nodeHost node) (nodePort node)
248+
localIO $ BS.readFile $ logFileName expr
249+
250+
251+
sendToNodeStandardInputIt :: (Node, String) -> Cloud ()
252+
sendToNodeStandardInputIt (node,inp)= do
253+
let program = fromMaybe (error "no Executable in service "++ show (nodeServices node)) $
254+
lookup "executable" (nodeServices node)
255+
expr= pathExe program (nodeHost node) (nodePort node)
256+
return () !> ("SEND TO NODE STANDARD INPUT", program, expr)
257+
sendExecuteStreamIt1 (expr, inp)
258+
where
259+
sendExecuteStreamIt1 (cmdline, inp)= localIO $ do
260+
map <- readIORef rinput
261+
let input1= fromMaybe (error "this command line has not been opened") $ M.lookup cmdline map
262+
hPutStrLn input1 inp
263+
hFlush input1
264+
return()
265+
266+
receiveFromNodeStandardOutputIt :: ReceiveFromNodeStandardOutput -> Cloud String
267+
receiveFromNodeStandardOutputIt (ReceiveFromNodeStandardOutput node ident) = local $ do
268+
let program = fromMaybe (error "no Executable in service "++ show (nodeServices node)) $
269+
lookup "executable" (nodeServices node)
270+
expr= pathExe program (nodeHost node) (nodePort node)
271+
return () !> ("RECEIVE FROM STANDARD OUTPUT",expr)
272+
labelState ident
273+
getMailbox' ("output"++ expr)
274+
275+
rinput :: IORef (M.Map String Handle)
276+
rinput= unsafePerformIO $ newIORef M.empty
277+
278+
279+
280+
281+
282+
283+
284+
-- | execute the shell command specified in a string and stream back at runtime -line by line- the standard output
285+
-- as soon as there is any output. It also stream all the standard error in case of exiting with a error status.
286+
-- to the service caller. invoked by `networkExecuteStream`.
287+
288+
289+
logFileName expr= subst expr ++ ".log"
290+
where
291+
subst []= []
292+
subst (' ':xs)= '-':subst xs
293+
subst ('/':xs)= '-':subst xs
294+
subst ('\"':xs)= '-':subst xs
295+
subst (x:xs)= x:subst xs
296+
297+
networkExecuteStreamIt :: String -> TransIO String
298+
networkExecuteStreamIt expr = do
299+
300+
r <- liftIO $ createProcess $ (shell expr){std_in=CreatePipe,std_err=CreatePipe,std_out=CreatePipe}
301+
liftIO $ atomicModifyIORef rinput $ \map -> (M.insert expr (input1 r) map,())
302+
303+
let logfile= logFileName expr
304+
305+
hlog <- liftIO $ openFile logfile WriteMode
306+
liftIO $ hPutStrLn hlog expr
307+
liftIO $ hClose hlog
308+
309+
line <- watch (output r) <|> watch (err r) <|> watchExitError r
310+
putMailbox' ("output" ++ expr) line
311+
hlog <- liftIO $ openFile logfile AppendMode
312+
liftIO $ hPutStrLn hlog line
313+
liftIO $ hClose hlog
314+
return line
315+
where
316+
317+
input1 r= inp where (Just inp,_,_,_)= r
318+
output r= out where (_,Just out,_,_)= r
319+
err r= err where (_,_,Just err,_)= r
320+
handle r= h where (_,_,_,h)= r
321+
322+
watch :: Handle -> TransIO String
323+
watch h= do
324+
abduce
325+
mline <- threads 0 $ (parallel $ (SMore <$> hGetLine' h) `catch` \(e :: SomeException) -> return SDone)
326+
case mline of
327+
SDone -> empty
328+
SMore line -> return line
329+
330+
where
331+
332+
hGetLine' h= do
333+
buff <- newIORef []
334+
getMore buff
335+
336+
where
337+
338+
getMore buff= do
339+
b <- hWaitForInput h 10
340+
if not b
341+
then do
342+
r <-readIORef buff
343+
if null r then getMore buff else return r
344+
else do
345+
c <- hGetChar h
346+
if c== '\n' then readIORef buff else do
347+
modifyIORef buff $ \str -> str ++ [c]
348+
getMore buff
349+
350+
watchExitError r= do -- make it similar to watch
351+
abduce
352+
liftIO $ waitForProcess $ handle r
353+
errors <- liftIO $ hGetContents (err r)
354+
return errors
355+

0 commit comments

Comments
 (0)