CPS IO -> Async Interaction Tree Effect Runtime
I'm deeply satisfied to be building an interaction tree runtime where the interaction trees are themselves computed via and represented by trees. It's trees all the way down.
This commit is contained in:
652
src/IODriver.hs
652
src/IODriver.hs
@@ -4,17 +4,28 @@ module IODriver
|
||||
, unsafePerms
|
||||
, checkIOSentinel
|
||||
, runIO
|
||||
, runIOWithEnv
|
||||
, runIOWith
|
||||
) where
|
||||
|
||||
import Research (T(..), apply, toString, toNumber, ofString, ofNumber)
|
||||
import System.IO (putStr, getLine)
|
||||
import qualified System.IO as IO
|
||||
import Control.Exception (try, IOException, SomeException)
|
||||
import System.Exit (die)
|
||||
import System.IO.Error (isDoesNotExistError, isPermissionError, isAlreadyExistsError)
|
||||
import Data.List (isPrefixOf)
|
||||
import System.FilePath (normalise, isRelative, (</>), addTrailingPathSeparator, splitDirectories)
|
||||
import System.Directory (canonicalizePath, doesPathExist, getCurrentDirectory)
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Sequence as Seq
|
||||
import Data.Sequence (Seq, (|>), ViewL(..))
|
||||
import Data.Time.Clock (UTCTime, getCurrentTime, addUTCTime, diffUTCTime)
|
||||
import Control.Concurrent (threadDelay)
|
||||
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Permissions
|
||||
-- ---------------------------------------------------------------------------
|
||||
|
||||
data IOPermissions = IOPermissions
|
||||
{ allowRead :: [FilePath]
|
||||
@@ -22,6 +33,7 @@ data IOPermissions = IOPermissions
|
||||
, allowReadAll :: Bool
|
||||
, allowWriteAll :: Bool
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
defaultPerms :: IOPermissions
|
||||
defaultPerms = IOPermissions [] [] False False
|
||||
@@ -41,91 +53,390 @@ checkIOSentinel tree =
|
||||
_ -> Left "sentinel mismatch (expected \"tricuIO\")"
|
||||
_ -> Left "root is not an IO sentinel pair"
|
||||
|
||||
runIO :: IOPermissions -> T -> IO T
|
||||
runIO perms actionTree = go actionTree
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Runtime, Frames, and Machine
|
||||
-- ---------------------------------------------------------------------------
|
||||
|
||||
data Runtime = Runtime
|
||||
{ rtPerms :: IOPermissions
|
||||
, rtEnv :: T
|
||||
, rtState :: T
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data Frame
|
||||
= BindFrame T
|
||||
| LocalFrame T
|
||||
deriving (Show)
|
||||
|
||||
data Machine = Machine
|
||||
{ machineRuntime :: Runtime
|
||||
, machineCurrent :: T
|
||||
, machineFrames :: [Frame]
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Result convention
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Direct-return actions pass the raw value to the continuation:
|
||||
-- pure, bind, putStr, getLine, ask, local, get, put,
|
||||
-- fork, await, yield, sleep
|
||||
--
|
||||
-- Result-return actions wrap the outcome as an ok/err pair:
|
||||
-- ok val = Fork (Stem Leaf) (Fork val Leaf) -- (t t) val t
|
||||
-- err code = Fork Leaf (Fork code Leaf) -- t code t
|
||||
-- readFile, writeFile
|
||||
--
|
||||
-- Runtime protocol errors are returned as direct values via errResult.
|
||||
|
||||
-- Error code ranges:
|
||||
-- 1-19 host IO / filesystem errors
|
||||
-- 20-39 policy / permission errors
|
||||
-- 40-59 protocol / decode / type errors
|
||||
-- 60-79 async errors
|
||||
-- 80-99 scheduler / runtime errors
|
||||
|
||||
-- Host IO / filesystem errors (1-19)
|
||||
errDoesNotExist, errPermission, errAlreadyExists, errIOOther :: Integer
|
||||
errDoesNotExist = 1
|
||||
errPermission = 2
|
||||
errAlreadyExists = 3
|
||||
errIOOther = 4
|
||||
|
||||
-- Policy / permission errors (20-39)
|
||||
errPolicyDeny :: Integer
|
||||
errPolicyDeny = 20
|
||||
|
||||
-- Protocol / decode / type errors (40-59)
|
||||
errInvalidAction, errInvalidString :: Integer
|
||||
errInvalidAction = 40
|
||||
errInvalidString = 41
|
||||
|
||||
-- Async errors (60-79)
|
||||
errInvalidHandle, errSelfAwait, errInvalidSleep :: Integer
|
||||
errInvalidHandle = 60
|
||||
errSelfAwait = 61
|
||||
errInvalidSleep = 62
|
||||
|
||||
-- Scheduler / runtime errors (80-99)
|
||||
errDeadlock :: Integer
|
||||
errDeadlock = 80
|
||||
|
||||
ioErrorCode :: IOException -> Integer
|
||||
ioErrorCode e
|
||||
| isDoesNotExistError e = errDoesNotExist
|
||||
| isPermissionError e = errPermission
|
||||
| isAlreadyExistsError e = errAlreadyExists
|
||||
| otherwise = errIOOther
|
||||
|
||||
okResult :: T -> T
|
||||
okResult val = Fork (Stem Leaf) (Fork val Leaf)
|
||||
|
||||
errResult :: Integer -> T
|
||||
errResult code = Fork Leaf (Fork (ofNumber code) Leaf)
|
||||
|
||||
pureAction :: T -> T
|
||||
pureAction x = Fork (ofNumber 0) x
|
||||
|
||||
invalidAsyncHandleResult :: T
|
||||
invalidAsyncHandleResult = errResult errInvalidHandle
|
||||
|
||||
selfAwaitResult :: T
|
||||
selfAwaitResult = errResult errSelfAwait
|
||||
|
||||
deadlockResult :: T
|
||||
deadlockResult = errResult errDeadlock
|
||||
|
||||
invalidSleepResult :: T
|
||||
invalidSleepResult = errResult errInvalidSleep
|
||||
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Task identity and handles
|
||||
-- ---------------------------------------------------------------------------
|
||||
|
||||
newtype TaskId = TaskId Integer
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
taskHandle :: TaskId -> T
|
||||
taskHandle (TaskId n) =
|
||||
Fork (ofString "task") (ofNumber n)
|
||||
|
||||
decodeTaskHandle :: T -> Either String TaskId
|
||||
decodeTaskHandle tree =
|
||||
case tree of
|
||||
Fork tag nTree -> do
|
||||
tagString <- toString tag
|
||||
if tagString == "task"
|
||||
then TaskId <$> toNumber nTree
|
||||
else Left "invalid task handle tag"
|
||||
_ ->
|
||||
Left "invalid task handle"
|
||||
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Free-monad action AST
|
||||
-- ---------------------------------------------------------------------------
|
||||
|
||||
data Action
|
||||
= APure T
|
||||
| ABind T T
|
||||
| APutStr T
|
||||
| AGetLine
|
||||
| AReadFile T
|
||||
| AWriteFile T T
|
||||
| AAsk
|
||||
| ALocal T T
|
||||
| AGet
|
||||
| APut T
|
||||
| AFork T
|
||||
| AAwait T
|
||||
| AYield
|
||||
| ASleep T
|
||||
deriving (Show)
|
||||
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Action tag constants
|
||||
-- ---------------------------------------------------------------------------
|
||||
|
||||
tagPure, tagBind :: Integer
|
||||
tagPure = 0
|
||||
tagBind = 1
|
||||
|
||||
tagPutStr, tagGetLine :: Integer
|
||||
tagPutStr = 10
|
||||
tagGetLine = 11
|
||||
|
||||
tagReadFile, tagWriteFile :: Integer
|
||||
tagReadFile = 20
|
||||
tagWriteFile = 21
|
||||
|
||||
tagAsk, tagLocal :: Integer
|
||||
tagAsk = 30
|
||||
tagLocal = 31
|
||||
|
||||
tagGet, tagPut :: Integer
|
||||
tagGet = 40
|
||||
tagPut = 41
|
||||
|
||||
tagFork, tagAwait, tagYield, tagSleep :: Integer
|
||||
tagFork = 60
|
||||
tagAwait = 61
|
||||
tagYield = 62
|
||||
tagSleep = 63
|
||||
|
||||
data Step
|
||||
= Halt Runtime T
|
||||
| Continue Machine
|
||||
| ForkRequested T Machine
|
||||
| AwaitRequested TaskId Machine
|
||||
| YieldRequested Machine
|
||||
| SleepRequested Integer Machine
|
||||
deriving (Show)
|
||||
|
||||
decodeAction :: T -> Either String Action
|
||||
decodeAction tree =
|
||||
case tree of
|
||||
Fork tag payload ->
|
||||
case toNumber tag of
|
||||
Right n | n == tagPure ->
|
||||
Right (APure payload)
|
||||
|
||||
Right n | n == tagBind ->
|
||||
case payload of
|
||||
Fork left k -> Right (ABind left k)
|
||||
_ -> Left "Invalid Bind: expected pair action continuation"
|
||||
|
||||
Right n | n == tagPutStr ->
|
||||
Right (APutStr payload)
|
||||
|
||||
Right n | n == tagGetLine ->
|
||||
Right AGetLine
|
||||
|
||||
Right n | n == tagReadFile ->
|
||||
Right (AReadFile payload)
|
||||
|
||||
Right n | n == tagWriteFile ->
|
||||
case payload of
|
||||
Fork path contents -> Right (AWriteFile path contents)
|
||||
_ -> Left "Invalid WriteFile: expected pair path contents"
|
||||
|
||||
Right n | n == tagAsk ->
|
||||
Right AAsk
|
||||
|
||||
Right n | n == tagLocal ->
|
||||
case payload of
|
||||
Fork f action -> Right (ALocal f action)
|
||||
_ -> Left "Invalid Local: expected pair function action"
|
||||
|
||||
Right n | n == tagGet ->
|
||||
Right AGet
|
||||
|
||||
Right n | n == tagPut ->
|
||||
Right (APut payload)
|
||||
|
||||
Right n | n == tagFork ->
|
||||
Right (AFork payload)
|
||||
|
||||
Right n | n == tagAwait ->
|
||||
Right (AAwait payload)
|
||||
|
||||
Right n | n == tagYield ->
|
||||
Right AYield
|
||||
|
||||
Right n | n == tagSleep ->
|
||||
Right (ASleep payload)
|
||||
|
||||
Right n ->
|
||||
Left $ "Unknown IO action tag: " ++ show n
|
||||
|
||||
Left err ->
|
||||
Left $ "Invalid action tag: " ++ err
|
||||
|
||||
_ ->
|
||||
Left $ "Invalid action tree: expected pair tag payload, got " ++ show tree
|
||||
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Small-step IO machine
|
||||
-- ---------------------------------------------------------------------------
|
||||
|
||||
finishValue :: Machine -> T -> IO Step
|
||||
finishValue machine value =
|
||||
case machineFrames machine of
|
||||
[] ->
|
||||
pure (Halt (machineRuntime machine) value)
|
||||
|
||||
BindFrame k : rest ->
|
||||
pure (Continue machine
|
||||
{ machineCurrent = apply k value
|
||||
, machineFrames = rest
|
||||
})
|
||||
|
||||
LocalFrame oldEnv : rest ->
|
||||
let runtime' = (machineRuntime machine) { rtEnv = oldEnv }
|
||||
in pure (Continue machine
|
||||
{ machineRuntime = runtime'
|
||||
, machineCurrent = pureAction value
|
||||
, machineFrames = rest
|
||||
})
|
||||
|
||||
stepMachine :: Machine -> IO Step
|
||||
stepMachine machine =
|
||||
case decodeAction (machineCurrent machine) of
|
||||
Right action -> dispatch action
|
||||
Left _ -> finishValue machine (errResult errInvalidAction)
|
||||
where
|
||||
go tree =
|
||||
case tree of
|
||||
Fork tag payload -> do
|
||||
tagNum <- case toNumber tag of
|
||||
Right n -> return n
|
||||
Left err -> die $ "Invalid IO action tag: " ++ err
|
||||
dispatch tagNum payload
|
||||
_ -> die $ "Invalid IO action tree: expected pair tag payload, got " ++ show tree
|
||||
dispatch action = case action of
|
||||
APure val ->
|
||||
finishValue machine val
|
||||
|
||||
dispatch tagNum payload = case tagNum of
|
||||
0 -> return payload -- Pure
|
||||
ABind left k ->
|
||||
pure (Continue machine
|
||||
{ machineCurrent = left
|
||||
, machineFrames = BindFrame k : machineFrames machine
|
||||
})
|
||||
|
||||
1 -> case payload of
|
||||
Fork str k -> do
|
||||
s <- decodeString str "PutStr"
|
||||
putStr s
|
||||
go (apply k Leaf)
|
||||
_ -> die "Invalid PutStr payload: expected pair string continuation"
|
||||
APutStr str ->
|
||||
case decodeString str "PutStr" of
|
||||
Right s -> do
|
||||
putStr s
|
||||
finishValue machine Leaf
|
||||
Left _ ->
|
||||
finishValue machine (errResult errInvalidString)
|
||||
|
||||
2 -> do
|
||||
AGetLine -> do
|
||||
line <- getLine
|
||||
go (apply payload (ofString line))
|
||||
finishValue machine (ofString line)
|
||||
|
||||
3 -> case payload of
|
||||
Fork path k -> do
|
||||
p <- decodeString path "ReadFile"
|
||||
mDeny <- checkReadPerm p
|
||||
case mDeny of
|
||||
Just denied -> go (apply k denied)
|
||||
Nothing -> do
|
||||
content <- tryReadFile p
|
||||
go (apply k content)
|
||||
_ -> die "Invalid ReadFile payload: expected pair path continuation"
|
||||
|
||||
4 -> case payload of
|
||||
Fork path rest -> case rest of
|
||||
Fork contents k -> do
|
||||
p <- decodeString path "WriteFile"
|
||||
c <- decodeString contents "WriteFile"
|
||||
mDeny <- checkWritePerm p
|
||||
AReadFile path ->
|
||||
case decodeString path "ReadFile" of
|
||||
Right p -> do
|
||||
mDeny <- checkReadPerm p
|
||||
case mDeny of
|
||||
Just denied -> go (apply k denied)
|
||||
Nothing -> do
|
||||
res <- tryWriteFile p c
|
||||
go (apply k res)
|
||||
_ -> die "Invalid WriteFile payload: expected pair contents continuation"
|
||||
_ -> die "Invalid WriteFile payload: expected pair path (pair contents continuation)"
|
||||
Just denied -> finishValue machine denied
|
||||
Nothing -> tryReadFile p >>= finishValue machine
|
||||
Left _ -> finishValue machine (errResult errInvalidString)
|
||||
|
||||
_ -> die $ "Unknown IO action tag: " ++ show tagNum
|
||||
AWriteFile path contents ->
|
||||
case decodeString path "WriteFile" of
|
||||
Right p ->
|
||||
case decodeString contents "WriteFile" of
|
||||
Right c -> do
|
||||
mDeny <- checkWritePerm p
|
||||
case mDeny of
|
||||
Just denied -> finishValue machine denied
|
||||
Nothing -> tryWriteFile p c >>= finishValue machine
|
||||
Left _ -> finishValue machine (errResult errInvalidString)
|
||||
Left _ -> finishValue machine (errResult errInvalidString)
|
||||
|
||||
decodeString t ctx =
|
||||
case toString t of
|
||||
Right s -> return s
|
||||
Left err -> die $ "Invalid " ++ ctx ++ " string: " ++ err
|
||||
AAsk ->
|
||||
finishValue machine (rtEnv (machineRuntime machine))
|
||||
|
||||
ALocal f action' ->
|
||||
let runtime = machineRuntime machine
|
||||
oldEnv = rtEnv runtime
|
||||
newEnv = apply f oldEnv
|
||||
runtime' = runtime { rtEnv = newEnv }
|
||||
in pure (Continue machine
|
||||
{ machineRuntime = runtime'
|
||||
, machineCurrent = action'
|
||||
, machineFrames = LocalFrame oldEnv : machineFrames machine
|
||||
})
|
||||
|
||||
AGet ->
|
||||
finishValue machine (rtState (machineRuntime machine))
|
||||
|
||||
APut newState ->
|
||||
let runtime' = (machineRuntime machine) { rtState = newState }
|
||||
in finishValue (machine { machineRuntime = runtime' }) Leaf
|
||||
|
||||
AFork childAction ->
|
||||
pure (ForkRequested childAction machine)
|
||||
|
||||
AAwait handleTree ->
|
||||
case decodeTaskHandle handleTree of
|
||||
Right taskId ->
|
||||
pure (AwaitRequested taskId machine)
|
||||
Left _ ->
|
||||
finishValue machine invalidAsyncHandleResult
|
||||
|
||||
AYield ->
|
||||
pure (YieldRequested machine)
|
||||
|
||||
ASleep msTree ->
|
||||
case toNumber msTree of
|
||||
Right ms | ms >= 0 ->
|
||||
pure (SleepRequested ms machine)
|
||||
_ ->
|
||||
finishValue machine invalidSleepResult
|
||||
|
||||
-- Permission and IO helpers
|
||||
checkReadPerm p =
|
||||
if allowReadAll perms
|
||||
if allowReadAll (rtPerms (machineRuntime machine))
|
||||
then return Nothing
|
||||
else do
|
||||
mp <- canonicalizeSafe p
|
||||
case mp of
|
||||
Left _ -> return $ Just policyErrResult
|
||||
Left _ -> return $ Just policyErrResult
|
||||
Right path -> do
|
||||
allowed <- pathAllowed path (allowRead perms)
|
||||
allowed <- pathAllowed path (allowRead (rtPerms (machineRuntime machine)))
|
||||
if allowed
|
||||
then return Nothing
|
||||
else return $ Just policyErrResult
|
||||
|
||||
checkWritePerm p =
|
||||
if allowWriteAll perms
|
||||
if allowWriteAll (rtPerms (machineRuntime machine))
|
||||
then return Nothing
|
||||
else do
|
||||
mp <- canonicalizeSafe p
|
||||
case mp of
|
||||
Left _ -> return $ Just policyErrResult
|
||||
Left _ -> return $ Just policyErrResult
|
||||
Right path -> do
|
||||
allowed <- pathAllowed path (allowWrite perms)
|
||||
allowed <- pathAllowed path (allowWrite (rtPerms (machineRuntime machine)))
|
||||
if allowed
|
||||
then return Nothing
|
||||
else return $ Just policyErrResult
|
||||
|
||||
policyErrResult = errResult 5
|
||||
policyErrResult = errResult errPolicyDeny
|
||||
|
||||
canonicalizeSafe :: FilePath -> IO (Either String FilePath)
|
||||
canonicalizeSafe p = do
|
||||
@@ -187,12 +498,225 @@ runIO perms actionTree = go actionTree
|
||||
Right () -> return $ okResult Leaf
|
||||
Left e -> return $ errResult (ioErrorCode e)
|
||||
|
||||
okResult val = Fork (Stem Leaf) (Fork val Leaf) -- pair true (pair val t)
|
||||
errResult code = Fork Leaf (Fork (ofNumber code) Leaf) -- pair false (pair code t)
|
||||
decodeString t ctx =
|
||||
case toString t of
|
||||
Right s -> Right s
|
||||
Left _ -> Left $ "Invalid " ++ ctx ++ " string"
|
||||
|
||||
ioErrorCode :: IOException -> Integer
|
||||
ioErrorCode e
|
||||
| isDoesNotExistError e = 1
|
||||
| isPermissionError e = 2
|
||||
| isAlreadyExistsError e = 3
|
||||
| otherwise = 4
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Scheduler
|
||||
-- ---------------------------------------------------------------------------
|
||||
|
||||
data TaskStatus
|
||||
= Runnable Machine
|
||||
| BlockedOn TaskId Machine
|
||||
| Sleeping UTCTime Machine
|
||||
| Completed Runtime T
|
||||
deriving (Show)
|
||||
|
||||
data Scheduler = Scheduler
|
||||
{ schedulerNextTaskId :: Integer
|
||||
, schedulerRunnable :: Seq TaskId
|
||||
, schedulerTasks :: Map TaskId TaskStatus
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
initialScheduler :: Machine -> Scheduler
|
||||
initialScheduler mainMachine =
|
||||
Scheduler
|
||||
{ schedulerNextTaskId = 1
|
||||
, schedulerRunnable = Seq.singleton (TaskId 0)
|
||||
, schedulerTasks = Map.singleton (TaskId 0) (Runnable mainMachine)
|
||||
}
|
||||
|
||||
runtimeOfStatus :: TaskStatus -> Runtime
|
||||
runtimeOfStatus (Runnable machine) = machineRuntime machine
|
||||
runtimeOfStatus (BlockedOn _ machine) = machineRuntime machine
|
||||
runtimeOfStatus (Sleeping _ machine) = machineRuntime machine
|
||||
runtimeOfStatus (Completed runtime _) = runtime
|
||||
|
||||
wakeAwaiters :: TaskId -> T -> Scheduler -> Scheduler
|
||||
wakeAwaiters targetId value scheduler =
|
||||
let (newlyRunnable, tasks') =
|
||||
Map.mapAccumWithKey wakeOne [] (schedulerTasks scheduler)
|
||||
queue' = foldl (|>) (schedulerRunnable scheduler) (reverse newlyRunnable)
|
||||
in scheduler { schedulerTasks = tasks', schedulerRunnable = queue' }
|
||||
where
|
||||
wakeOne acc tid (BlockedOn blockedTarget machine)
|
||||
| blockedTarget == targetId =
|
||||
let machine' = machine { machineCurrent = pureAction value }
|
||||
in (tid : acc, Runnable machine')
|
||||
wakeOne acc _ status = (acc, status)
|
||||
|
||||
wakeDueSleepers :: Scheduler -> IO Scheduler
|
||||
wakeDueSleepers scheduler = do
|
||||
now <- getCurrentTime
|
||||
let (newlyRunnable, tasks') =
|
||||
Map.mapAccumWithKey (wakeOne now) [] (schedulerTasks scheduler)
|
||||
queue' = foldl (|>) (schedulerRunnable scheduler) (reverse newlyRunnable)
|
||||
pure scheduler { schedulerTasks = tasks', schedulerRunnable = queue' }
|
||||
where
|
||||
wakeOne now acc tid (Sleeping wakeTime machine)
|
||||
| wakeTime <= now = (tid : acc, Runnable machine)
|
||||
wakeOne _ acc _ status = (acc, status)
|
||||
|
||||
nearestSleepTime :: Scheduler -> Maybe UTCTime
|
||||
nearestSleepTime = Map.foldl' minSleep Nothing . schedulerTasks
|
||||
where
|
||||
minSleep acc (Sleeping t _) = Just $ maybe t (min t) acc
|
||||
minSleep acc _ = acc
|
||||
|
||||
resumeCurrentWith :: TaskId -> T -> Machine -> Scheduler -> IO Scheduler
|
||||
resumeCurrentWith taskId value machine scheduler =
|
||||
let machine' = machine { machineCurrent = pureAction value }
|
||||
in pure scheduler
|
||||
{ schedulerTasks = Map.insert taskId (Runnable machine') (schedulerTasks scheduler)
|
||||
, schedulerRunnable = schedulerRunnable scheduler |> taskId
|
||||
}
|
||||
|
||||
handleStep :: TaskId -> Step -> Scheduler -> IO Scheduler
|
||||
handleStep taskId (Continue machine) scheduler =
|
||||
pure scheduler
|
||||
{ schedulerTasks = Map.insert taskId (Runnable machine) (schedulerTasks scheduler)
|
||||
, schedulerRunnable = schedulerRunnable scheduler |> taskId
|
||||
}
|
||||
|
||||
handleStep taskId (Halt _runtime value) scheduler =
|
||||
pure (wakeAwaiters taskId value scheduler')
|
||||
where
|
||||
scheduler' = scheduler
|
||||
{ schedulerTasks = Map.insert taskId (Completed _runtime value) (schedulerTasks scheduler)
|
||||
}
|
||||
|
||||
handleStep parentId (ForkRequested childAction parentMachine) scheduler =
|
||||
let childId = TaskId (schedulerNextTaskId scheduler)
|
||||
handle = taskHandle childId
|
||||
|
||||
parentMachine' =
|
||||
parentMachine { machineCurrent = pureAction handle }
|
||||
|
||||
childMachine =
|
||||
Machine
|
||||
{ machineRuntime = machineRuntime parentMachine
|
||||
, machineCurrent = childAction
|
||||
, machineFrames = []
|
||||
}
|
||||
|
||||
tasks' =
|
||||
Map.insert parentId (Runnable parentMachine') $
|
||||
Map.insert childId (Runnable childMachine) $
|
||||
schedulerTasks scheduler
|
||||
|
||||
queue' =
|
||||
schedulerRunnable scheduler |> parentId |> childId
|
||||
|
||||
in pure scheduler
|
||||
{ schedulerNextTaskId = schedulerNextTaskId scheduler + 1
|
||||
, schedulerTasks = tasks'
|
||||
, schedulerRunnable = queue'
|
||||
}
|
||||
|
||||
handleStep currentId (AwaitRequested targetId machine) scheduler
|
||||
| targetId == currentId =
|
||||
resumeCurrentWith currentId selfAwaitResult machine scheduler
|
||||
|
||||
| otherwise =
|
||||
case Map.lookup targetId (schedulerTasks scheduler) of
|
||||
Nothing ->
|
||||
resumeCurrentWith currentId invalidAsyncHandleResult machine scheduler
|
||||
|
||||
Just (Completed _ value) ->
|
||||
resumeCurrentWith currentId value machine scheduler
|
||||
|
||||
Just _ ->
|
||||
pure scheduler
|
||||
{ schedulerTasks =
|
||||
Map.insert currentId (BlockedOn targetId machine) (schedulerTasks scheduler)
|
||||
}
|
||||
|
||||
handleStep taskId (YieldRequested machine) scheduler =
|
||||
resumeCurrentWith taskId Leaf machine scheduler
|
||||
|
||||
handleStep taskId (SleepRequested ms machine) scheduler = do
|
||||
now <- getCurrentTime
|
||||
let seconds = fromIntegral ms / 1000
|
||||
wakeTime = addUTCTime seconds now
|
||||
machine' = machine { machineCurrent = pureAction Leaf }
|
||||
pure scheduler
|
||||
{ schedulerTasks =
|
||||
Map.insert taskId (Sleeping wakeTime machine') (schedulerTasks scheduler)
|
||||
}
|
||||
|
||||
handleNoRunnable :: Scheduler -> IO Scheduler
|
||||
handleNoRunnable scheduler =
|
||||
case nearestSleepTime scheduler of
|
||||
Just wakeTime -> do
|
||||
now <- getCurrentTime
|
||||
let micros = max 0 (floor (diffUTCTime wakeTime now * 1000000))
|
||||
threadDelay micros
|
||||
wakeDueSleepers scheduler
|
||||
|
||||
Nothing ->
|
||||
case Map.lookup (TaskId 0) (schedulerTasks scheduler) of
|
||||
Just status ->
|
||||
pure scheduler
|
||||
{ schedulerTasks =
|
||||
Map.insert (TaskId 0)
|
||||
(Completed (runtimeOfStatus status) deadlockResult)
|
||||
(schedulerTasks scheduler)
|
||||
}
|
||||
Nothing ->
|
||||
pure scheduler
|
||||
|
||||
schedulerStep :: Scheduler -> IO Scheduler
|
||||
schedulerStep scheduler = do
|
||||
scheduler1 <- wakeDueSleepers scheduler
|
||||
case Seq.viewl (schedulerRunnable scheduler1) of
|
||||
EmptyL ->
|
||||
handleNoRunnable scheduler1
|
||||
|
||||
taskId :< restQueue ->
|
||||
case Map.lookup taskId (schedulerTasks scheduler1) of
|
||||
Just (Runnable machine) -> do
|
||||
step <- stepMachine machine
|
||||
handleStep taskId step scheduler1 { schedulerRunnable = restQueue }
|
||||
|
||||
_ ->
|
||||
pure scheduler1 { schedulerRunnable = restQueue }
|
||||
|
||||
runScheduler :: Scheduler -> IO (T, T)
|
||||
runScheduler scheduler =
|
||||
case Map.lookup (TaskId 0) (schedulerTasks scheduler) of
|
||||
Just (Completed runtime value) ->
|
||||
pure (value, rtState runtime)
|
||||
|
||||
_ ->
|
||||
schedulerStep scheduler >>= runScheduler
|
||||
|
||||
-- ---------------------------------------------------------------------------
|
||||
-- Public API
|
||||
-- ---------------------------------------------------------------------------
|
||||
|
||||
runIOWith :: IOPermissions -> T -> T -> T -> IO (T, T)
|
||||
runIOWith perms env initialState action =
|
||||
runScheduler (initialScheduler initialMachine)
|
||||
where
|
||||
initialMachine = Machine
|
||||
{ machineRuntime = Runtime
|
||||
{ rtPerms = perms
|
||||
, rtEnv = env
|
||||
, rtState = initialState
|
||||
}
|
||||
, machineCurrent = action
|
||||
, machineFrames = []
|
||||
}
|
||||
|
||||
runIOWithEnv :: IOPermissions -> T -> T -> IO T
|
||||
runIOWithEnv perms env action = do
|
||||
(result, _) <- runIOWith perms env Leaf action
|
||||
pure result
|
||||
|
||||
runIO :: IOPermissions -> T -> IO T
|
||||
runIO perms action = do
|
||||
(result, _) <- runIOWith perms Leaf Leaf action
|
||||
pure result
|
||||
|
||||
Reference in New Issue
Block a user