Several subtle IODriver bug fixes

This commit is contained in:
2026-05-16 09:33:14 -05:00
parent 8d5e76db1c
commit e2d035286d
6 changed files with 315 additions and 299 deletions

View File

@@ -8,7 +8,7 @@ module IODriver
, runIOWith
) where
import Research (T(..), apply, toString, toNumber, ofString, ofNumber, ofBytes)
import Research (T(..), apply, toString, toNumber, ofString, ofNumber, ofBytes, toBytes)
import qualified Data.ByteString as BS
import System.IO (putStr, getLine)
import qualified System.IO as IO
@@ -22,7 +22,11 @@ import Data.Map.Strict (Map)
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), ViewL(..))
import Data.Time.Clock (UTCTime, getCurrentTime, addUTCTime, diffUTCTime)
import Control.Concurrent (threadDelay)
import Control.Concurrent (threadDelay, forkIO)
import Control.Concurrent.STM (TVar, newTVarIO, atomically, readTVar, writeTVar, modifyTVar', retry)
import qualified Data.Set as Set
import Data.Set (Set)
import qualified Data.Foldable as Fold
-- ---------------------------------------------------------------------------
-- Permissions
@@ -115,10 +119,11 @@ errInvalidAction = 40
errInvalidString = 41
-- Async errors (60-79)
errInvalidHandle, errSelfAwait, errInvalidSleep :: Integer
errInvalidHandle, errSelfAwait, errInvalidSleep, errCyclicAwait :: Integer
errInvalidHandle = 60
errSelfAwait = 61
errInvalidSleep = 62
errCyclicAwait = 63
-- Scheduler / runtime errors (80-99)
errDeadlock :: Integer
@@ -182,9 +187,11 @@ data Action
= APure T
| ABind T T
| APutStr T
| APutBytes T
| AGetLine
| AReadFile T
| AWriteFile T T
| AWriteBytes T T
| AAsk
| ALocal T T
| AGet
@@ -203,13 +210,15 @@ tagPure, tagBind :: Integer
tagPure = 0
tagBind = 1
tagPutStr, tagGetLine :: Integer
tagPutStr, tagPutBytes, tagGetLine :: Integer
tagPutStr = 10
tagPutBytes = 12
tagGetLine = 11
tagReadFile, tagWriteFile :: Integer
tagReadFile, tagWriteFile, tagWriteBytes :: Integer
tagReadFile = 20
tagWriteFile = 21
tagWriteBytes = 22
tagAsk, tagLocal :: Integer
tagAsk = 30
@@ -232,7 +241,16 @@ data Step
| AwaitRequested TaskId Machine
| YieldRequested Machine
| SleepRequested Integer Machine
deriving (Show)
| AsyncAction (IO T) Machine
instance Show Step where
show (Halt _ v) = "Halt _ (" ++ show v ++ ")"
show (Continue m) = "Continue (" ++ show m ++ ")"
show (ForkRequested t m) = "ForkRequested (" ++ show t ++ ") (" ++ show m ++ ")"
show (AwaitRequested tid m) = "AwaitRequested " ++ show tid ++ " (" ++ show m ++ ")"
show (YieldRequested m) = "YieldRequested (" ++ show m ++ ")"
show (SleepRequested n m) = "SleepRequested " ++ show n ++ " (" ++ show m ++ ")"
show (AsyncAction _ m) = "AsyncAction <io> (" ++ show m ++ ")"
decodeAction :: T -> Either String Action
decodeAction tree =
@@ -250,6 +268,9 @@ decodeAction tree =
Right n | n == tagPutStr ->
Right (APutStr payload)
Right n | n == tagPutBytes ->
Right (APutBytes payload)
Right n | n == tagGetLine ->
Right AGetLine
@@ -261,6 +282,11 @@ decodeAction tree =
Fork path contents -> Right (AWriteFile path contents)
_ -> Left "Invalid WriteFile: expected pair path contents"
Right n | n == tagWriteBytes ->
case payload of
Fork path contents -> Right (AWriteBytes path contents)
_ -> Left "Invalid WriteBytes: expected pair path contents"
Right n | n == tagAsk ->
Right AAsk
@@ -338,15 +364,20 @@ stepMachine machine =
APutStr str ->
case decodeString str "PutStr" of
Right s -> do
putStr s
finishValue machine Leaf
Right s ->
pure (AsyncAction (putStr s >> pure Leaf) machine)
Left _ ->
finishValue machine (errResult errInvalidString)
AGetLine -> do
line <- getLine
finishValue machine (ofString line)
APutBytes bs ->
case decodeBytes bs "PutBytes" of
Right b ->
pure (AsyncAction (BS.putStr b >> pure Leaf) machine)
Left _ ->
finishValue machine (errResult errInvalidString)
AGetLine ->
pure (AsyncAction (ofString <$> getLine) machine)
AReadFile path ->
case decodeString path "ReadFile" of
@@ -354,7 +385,7 @@ stepMachine machine =
mDeny <- checkReadPerm p
case mDeny of
Just denied -> finishValue machine denied
Nothing -> tryReadFile p >>= finishValue machine
Nothing -> pure (AsyncAction (tryReadFile p) machine)
Left _ -> finishValue machine (errResult errInvalidString)
AWriteFile path contents ->
@@ -365,7 +396,19 @@ stepMachine machine =
mDeny <- checkWritePerm p
case mDeny of
Just denied -> finishValue machine denied
Nothing -> tryWriteFile p c >>= finishValue machine
Nothing -> pure (AsyncAction (tryWriteFile p c) machine)
Left _ -> finishValue machine (errResult errInvalidString)
Left _ -> finishValue machine (errResult errInvalidString)
AWriteBytes path contents ->
case decodeString path "WriteBytes" of
Right p ->
case decodeBytes contents "WriteBytes" of
Right c -> do
mDeny <- checkWritePerm p
case mDeny of
Just denied -> finishValue machine denied
Nothing -> pure (AsyncAction (tryWriteFileBytes p c) machine)
Left _ -> finishValue machine (errResult errInvalidString)
Left _ -> finishValue machine (errResult errInvalidString)
@@ -499,11 +542,22 @@ stepMachine machine =
Right () -> return $ okResult Leaf
Left e -> return $ errResult (ioErrorCode e)
tryWriteFileBytes path contents = do
result <- try (BS.writeFile path contents) :: IO (Either IOException ())
case result of
Right () -> return $ okResult Leaf
Left e -> return $ errResult (ioErrorCode e)
decodeString t ctx =
case toString t of
Right s -> Right s
Left _ -> Left $ "Invalid " ++ ctx ++ " string"
decodeBytes t ctx =
case toBytes t of
Right b -> Right b
Left _ -> Left $ "Invalid " ++ ctx ++ " bytes"
-- ---------------------------------------------------------------------------
-- Scheduler
-- ---------------------------------------------------------------------------
@@ -512,60 +566,101 @@ data TaskStatus
= Runnable Machine
| BlockedOn TaskId Machine
| Sleeping UTCTime Machine
| Completed Runtime T
| AsyncWaiting Machine
deriving (Show)
data Scheduler = Scheduler
{ schedulerNextTaskId :: Integer
, schedulerRunnable :: Seq TaskId
, schedulerTasks :: Map TaskId TaskStatus
, schedulerWaiters :: Map TaskId (Seq TaskId)
, schedulerSleepQueue :: Map UTCTime (Set TaskId)
, schedulerAsyncCompleted :: TVar (Map TaskId T)
, schedulerCompleted :: Map TaskId (T, T)
}
deriving (Show)
initialScheduler :: Machine -> Scheduler
initialScheduler mainMachine =
instance Show Scheduler where
show s = "Scheduler { schedulerNextTaskId = " ++ show (schedulerNextTaskId s)
++ ", schedulerRunnable = " ++ show (schedulerRunnable s)
++ ", schedulerTasks = " ++ show (schedulerTasks s)
++ ", schedulerWaiters = " ++ show (schedulerWaiters s)
++ ", schedulerSleepQueue = " ++ show (schedulerSleepQueue s)
++ ", schedulerAsyncCompleted = <tvar>"
++ ", schedulerCompleted = " ++ show (schedulerCompleted s)
++ " }"
initialScheduler :: TVar (Map TaskId T) -> Machine -> Scheduler
initialScheduler asyncVar mainMachine =
Scheduler
{ schedulerNextTaskId = 1
, schedulerRunnable = Seq.singleton (TaskId 0)
, schedulerTasks = Map.singleton (TaskId 0) (Runnable mainMachine)
, schedulerWaiters = Map.empty
, schedulerSleepQueue = Map.empty
, schedulerAsyncCompleted = asyncVar
, schedulerCompleted = Map.empty
}
runtimeOfStatus :: TaskStatus -> Runtime
runtimeOfStatus (Runnable machine) = machineRuntime machine
runtimeOfStatus (BlockedOn _ machine) = machineRuntime machine
runtimeOfStatus (Sleeping _ machine) = machineRuntime machine
runtimeOfStatus (Completed runtime _) = runtime
runtimeOfStatus :: TaskStatus -> Maybe Runtime
runtimeOfStatus (Runnable machine) = Just (machineRuntime machine)
runtimeOfStatus (BlockedOn _ machine) = Just (machineRuntime machine)
runtimeOfStatus (Sleeping _ machine) = Just (machineRuntime machine)
runtimeOfStatus (AsyncWaiting machine) = Just (machineRuntime machine)
wakeAwaiters :: TaskId -> T -> Scheduler -> Scheduler
wakeAwaiters targetId value scheduler =
let (newlyRunnable, tasks') =
Map.mapAccumWithKey wakeOne [] (schedulerTasks scheduler)
queue' = foldl (|>) (schedulerRunnable scheduler) (reverse newlyRunnable)
in scheduler { schedulerTasks = tasks', schedulerRunnable = queue' }
case Map.lookup targetId (schedulerWaiters scheduler) of
Nothing -> scheduler
Just waiters ->
let (tasks', queue') = Fold.foldl' (wakeOne targetId value)
(schedulerTasks scheduler, schedulerRunnable scheduler)
waiters
in scheduler
{ schedulerTasks = tasks'
, schedulerRunnable = queue'
, schedulerWaiters = Map.delete targetId (schedulerWaiters scheduler)
}
where
wakeOne acc tid (BlockedOn blockedTarget machine)
| blockedTarget == targetId =
wakeOne _ _ (tasks, queue) waiterId =
case Map.lookup waiterId tasks of
Just (BlockedOn _ machine) ->
let machine' = machine { machineCurrent = pureAction value }
in (tid : acc, Runnable machine')
wakeOne acc _ status = (acc, status)
in (Map.insert waiterId (Runnable machine') tasks, queue |> waiterId)
_ -> (tasks, queue)
wakeDueSleepers :: Scheduler -> IO Scheduler
wakeDueSleepers scheduler = do
now <- getCurrentTime
let (newlyRunnable, tasks') =
Map.mapAccumWithKey (wakeOne now) [] (schedulerTasks scheduler)
queue' = foldl (|>) (schedulerRunnable scheduler) (reverse newlyRunnable)
pure scheduler { schedulerTasks = tasks', schedulerRunnable = queue' }
where
wakeOne now acc tid (Sleeping wakeTime machine)
| wakeTime <= now = (tid : acc, Runnable machine)
wakeOne _ acc _ status = (acc, status)
let go sq accTasks accQueue =
case Map.lookupMin sq of
Nothing -> (accTasks, accQueue, sq)
Just (t, taskSet)
| t <= now ->
let tasks' = Fold.foldl' (\m tid ->
case Map.lookup tid m of
Just (Sleeping _ machine) -> Map.insert tid (Runnable machine) m
_ -> m
) accTasks (Set.toList taskSet)
queue' = Fold.foldl' (|>) accQueue (Set.toList taskSet)
in go (Map.deleteMin sq) tasks' queue'
| otherwise -> (accTasks, accQueue, sq)
(tasks', queue', sq') = go (schedulerSleepQueue scheduler)
(schedulerTasks scheduler)
(schedulerRunnable scheduler)
pure scheduler
{ schedulerTasks = tasks'
, schedulerRunnable = queue'
, schedulerSleepQueue = sq'
}
nearestSleepTime :: Scheduler -> Maybe UTCTime
nearestSleepTime = Map.foldl' minSleep Nothing . schedulerTasks
nearestSleepTime = fmap fst . Map.lookupMin . schedulerSleepQueue
hasAsyncWaiters :: Scheduler -> Bool
hasAsyncWaiters = any isAsync . Map.elems . schedulerTasks
where
minSleep acc (Sleeping t _) = Just $ maybe t (min t) acc
minSleep acc _ = acc
isAsync (AsyncWaiting _) = True
isAsync _ = False
resumeCurrentWith :: TaskId -> T -> Machine -> Scheduler -> IO Scheduler
resumeCurrentWith taskId value machine scheduler =
@@ -575,6 +670,13 @@ resumeCurrentWith taskId value machine scheduler =
, schedulerRunnable = schedulerRunnable scheduler |> taskId
}
wouldCycle :: TaskId -> TaskId -> Map TaskId TaskStatus -> Bool
wouldCycle target current tasks =
case Map.lookup target tasks of
Just (BlockedOn next _) ->
next == current || wouldCycle next current tasks
_ -> False
handleStep :: TaskId -> Step -> Scheduler -> IO Scheduler
handleStep taskId (Continue machine) scheduler =
pure scheduler
@@ -582,12 +684,12 @@ handleStep taskId (Continue machine) scheduler =
, schedulerRunnable = schedulerRunnable scheduler |> taskId
}
handleStep taskId (Halt _runtime value) scheduler =
pure (wakeAwaiters taskId value scheduler')
where
scheduler' = scheduler
{ schedulerTasks = Map.insert taskId (Completed _runtime value) (schedulerTasks scheduler)
}
handleStep taskId (Halt runtime value) scheduler =
let scheduler' = wakeAwaiters taskId value scheduler
in pure scheduler'
{ schedulerTasks = Map.delete taskId (schedulerTasks scheduler')
, schedulerCompleted = Map.insert taskId (value, rtState runtime) (schedulerCompleted scheduler')
}
handleStep parentId (ForkRequested childAction parentMachine) scheduler =
let childId = TaskId (schedulerNextTaskId scheduler)
@@ -618,22 +720,29 @@ handleStep parentId (ForkRequested childAction parentMachine) scheduler =
}
handleStep currentId (AwaitRequested targetId machine) scheduler
| targetId == currentId =
| currentId == targetId =
resumeCurrentWith currentId selfAwaitResult machine scheduler
| otherwise =
case Map.lookup targetId (schedulerTasks scheduler) of
Nothing ->
resumeCurrentWith currentId invalidAsyncHandleResult machine scheduler
case Map.lookup targetId (schedulerCompleted scheduler) of
Just (value, _) -> resumeCurrentWith currentId value machine scheduler
Nothing -> resumeCurrentWith currentId invalidAsyncHandleResult machine scheduler
Just (Completed _ value) ->
resumeCurrentWith currentId value machine scheduler
Just (BlockedOn nextId _) ->
if wouldCycle targetId currentId (schedulerTasks scheduler)
then resumeCurrentWith currentId (errResult errCyclicAwait) machine scheduler
else block
Just _ ->
pure scheduler
{ schedulerTasks =
Map.insert currentId (BlockedOn targetId machine) (schedulerTasks scheduler)
}
Just _ -> block
where
block = pure scheduler
{ schedulerTasks = Map.insert currentId (BlockedOn targetId machine) (schedulerTasks scheduler)
, schedulerWaiters = Map.alter addWaiter targetId (schedulerWaiters scheduler)
}
addWaiter Nothing = Just (Seq.singleton currentId)
addWaiter (Just sq) = Just (sq |> currentId)
handleStep taskId (YieldRequested machine) scheduler =
resumeCurrentWith taskId Leaf machine scheduler
@@ -644,8 +753,16 @@ handleStep taskId (SleepRequested ms machine) scheduler = do
wakeTime = addUTCTime seconds now
machine' = machine { machineCurrent = pureAction Leaf }
pure scheduler
{ schedulerTasks =
Map.insert taskId (Sleeping wakeTime machine') (schedulerTasks scheduler)
{ schedulerTasks = Map.insert taskId (Sleeping wakeTime machine') (schedulerTasks scheduler)
, schedulerSleepQueue = Map.alter (Just . maybe (Set.singleton taskId) (Set.insert taskId)) wakeTime (schedulerSleepQueue scheduler)
}
handleStep taskId (AsyncAction ioAction machine) scheduler = do
_ <- forkIO $ do
result <- ioAction
atomically $ modifyTVar' (schedulerAsyncCompleted scheduler) (Map.insert taskId result)
pure scheduler
{ schedulerTasks = Map.insert taskId (AsyncWaiting machine) (schedulerTasks scheduler)
}
handleNoRunnable :: Scheduler -> IO Scheduler
@@ -658,20 +775,42 @@ handleNoRunnable scheduler =
wakeDueSleepers scheduler
Nothing ->
case Map.lookup (TaskId 0) (schedulerTasks scheduler) of
Just status ->
pure scheduler
{ schedulerTasks =
Map.insert (TaskId 0)
(Completed (runtimeOfStatus status) deadlockResult)
(schedulerTasks scheduler)
}
Nothing ->
if hasAsyncWaiters scheduler
then do
-- Block efficiently until at least one async operation completes.
atomically $ do
m <- readTVar (schedulerAsyncCompleted scheduler)
if Map.null m then retry else return ()
pure scheduler
else
case Map.lookup (TaskId 0) (schedulerTasks scheduler) of
Just status ->
case runtimeOfStatus status of
Just runtime ->
let scheduler' = wakeAwaiters (TaskId 0) deadlockResult scheduler
in pure scheduler'
{ schedulerTasks = Map.delete (TaskId 0) (schedulerTasks scheduler')
, schedulerCompleted = Map.insert (TaskId 0) (deadlockResult, rtState runtime) (schedulerCompleted scheduler')
}
Nothing -> pure scheduler
Nothing -> pure scheduler
schedulerStep :: Scheduler -> IO Scheduler
schedulerStep scheduler = do
scheduler1 <- wakeDueSleepers scheduler
-- Poll completed async operations and resume their tasks.
completed <- atomically $ do
m <- readTVar (schedulerAsyncCompleted scheduler)
writeTVar (schedulerAsyncCompleted scheduler) Map.empty
return m
schedulerAfterAsync <- Fold.foldlM
(\s (tid, val) ->
case Map.lookup tid (schedulerTasks s) of
Just (AsyncWaiting machine) -> resumeCurrentWith tid val machine s
_ -> pure s)
scheduler
(Map.toList completed)
scheduler1 <- wakeDueSleepers schedulerAfterAsync
case Seq.viewl (schedulerRunnable scheduler1) of
EmptyL ->
handleNoRunnable scheduler1
@@ -687,9 +826,9 @@ schedulerStep scheduler = do
runScheduler :: Scheduler -> IO (T, T)
runScheduler scheduler =
case Map.lookup (TaskId 0) (schedulerTasks scheduler) of
Just (Completed runtime value) ->
pure (value, rtState runtime)
case Map.lookup (TaskId 0) (schedulerCompleted scheduler) of
Just (value, finalState) ->
pure (value, finalState)
_ ->
schedulerStep scheduler >>= runScheduler
@@ -698,26 +837,29 @@ runScheduler scheduler =
-- Public API
-- ---------------------------------------------------------------------------
runIOWith :: IOPermissions -> T -> T -> T -> IO (T, T)
runIOWith :: IOPermissions -> T -> T -> T -> IO (Either String (T, T))
runIOWith perms env initialState action =
runScheduler (initialScheduler initialMachine)
where
initialMachine = Machine
{ machineRuntime = Runtime
{ rtPerms = perms
, rtEnv = env
, rtState = initialState
}
, machineCurrent = action
, machineFrames = []
}
case checkIOSentinel action of
Left err -> pure (Left err)
Right (_, action') -> do
asyncVar <- newTVarIO Map.empty
let initialMachine = Machine
{ machineRuntime = Runtime
{ rtPerms = perms
, rtEnv = env
, rtState = initialState
}
, machineCurrent = action'
, machineFrames = []
}
Right <$> runScheduler (initialScheduler asyncVar initialMachine)
runIOWithEnv :: IOPermissions -> T -> T -> IO T
runIOWithEnv :: IOPermissions -> T -> T -> IO (Either String T)
runIOWithEnv perms env action = do
(result, _) <- runIOWith perms env Leaf action
pure result
result <- runIOWith perms env Leaf action
pure (fmap fst result)
runIO :: IOPermissions -> T -> IO T
runIO :: IOPermissions -> T -> IO (Either String T)
runIO perms action = do
(result, _) <- runIOWith perms Leaf Leaf action
pure result
result <- runIOWith perms Leaf Leaf action
pure (fmap fst result)

View File

@@ -5,7 +5,7 @@ import System.Exit (die)
import Server (runServerWithPath)
import Eval (evalTricu, evalTricuWithStore, mainResult, result)
import FileEval (evaluateFileWithContext, evaluateFileWithStore, compileFile)
import IODriver (IOPermissions(..), checkIOSentinel, runIO)
import IODriver (IOPermissions(..), runIO)
import Parser (parseTricu)
import REPL (repl)
import Research (T, EvaluatedForm(..), Env, formatT, exportDag)
@@ -307,17 +307,17 @@ runEval opts = do
finalEnv <- foldM (evaluateFileWithStore mconn) Map.empty files
return $ mainResult finalEnv
finalT <- if evalIo opts
then case checkIOSentinel resultT of
Right (1, action) -> do
let perms = IOPermissions
{ allowRead = evalAllowRead opts
, allowWrite = evalAllowWrite opts
, allowReadAll = evalUnsafeIo opts || evalAllowReadAll opts
, allowWriteAll = evalUnsafeIo opts || evalAllowWriteAll opts
}
runIO perms action
Right (v, _) -> die $ "Unsupported IO ABI version: " ++ show v
Left err -> die $ "IO mode requested but " ++ err
then do
let perms = IOPermissions
{ allowRead = evalAllowRead opts
, allowWrite = evalAllowWrite opts
, allowReadAll = evalUnsafeIo opts || evalAllowReadAll opts
, allowWriteAll = evalUnsafeIo opts || evalAllowWriteAll opts
}
result <- runIO perms resultT
case result of
Left err -> die $ "IO error: " ++ err
Right val -> pure val
else return resultT
case mconn of
Just conn -> close conn