From e2d035286d74b71d00437231674ee043397a8cf0 Mon Sep 17 00:00:00 2001 From: James Eversole Date: Sat, 16 May 2026 09:33:14 -0500 Subject: [PATCH] Several subtle IODriver bug fixes --- demos/interactionTrees/getLineAsync.tri | 25 ++ docs/zig-io.md | 193 -------------- lib/io.tri | 2 + src/IODriver.hs | 322 +++++++++++++++++------- src/Main.hs | 24 +- test/Spec.hs | 48 +++- 6 files changed, 315 insertions(+), 299 deletions(-) create mode 100644 demos/interactionTrees/getLineAsync.tri delete mode 100644 docs/zig-io.md diff --git a/demos/interactionTrees/getLineAsync.tri b/demos/interactionTrees/getLineAsync.tri new file mode 100644 index 0000000..bb9809d --- /dev/null +++ b/demos/interactionTrees/getLineAsync.tri @@ -0,0 +1,25 @@ +-- Manual test for async getLine +-- +-- Run with: +-- nix run .# -- eval -f demos/async-getline-test.tri --io +-- +-- Expected behaviour: +-- 1. You immediately see: +-- Please enter your first name: +-- (this printed before you typed anything) +-- (this second line also printed before you typed anything) +-- 2. You type your name and press Enter. +-- 3. You see: +-- Hello, ! + +!import "../lib/io.tri" !Local + +main = io <| + bind (fork getLine) (h : + bind (putStr "Please enter your first name: ") (_ : + bind (putStr "\n(this printed before you typed anything)\n") (_ : + bind (putStr "\n(this second line also printed before you typed anything)\n") (_ : + bind (await h) (name : + bind (putStr "Hello, ") (_ : + bind (putStr name) (_ : + putStr "!\n"))))))) diff --git a/docs/zig-io.md b/docs/zig-io.md deleted file mode 100644 index f0fbe00..0000000 --- a/docs/zig-io.md +++ /dev/null @@ -1,193 +0,0 @@ -# Zig Interaction-Tree IO Runtime Plan - -## Goal - -Port the Haskell `IODriver` interaction-tree system into the Zig host so that: - -1. The Zig CLI (`tricu-zig`) can execute tricu programs with effects (`putStr`, `readFile`, `fork`, etc.). -2. The C FFI (`libarboricx`) exposes a single `arb_run_io` call, giving every language host (C, Python, PHP, Node) turnkey IO without reimplementing the protocol. -3. The fast native reduction path (currently ~0.005s for `id "hello"`) is used for pure computation; IO syscalls happen only at effect boundaries. - -## Current State - -| Host | Reduction Speed | IO Support | -|------|----------------|------------| -| Haskell interpreter | ~1.7s for `runArboricxTyped` demo | Full `IODriver.hs` with scheduler, async, permissions | -| Zig native | ~0.005s for `append` | None — pure reduction only | -| Zig kernel | ~0.235s for `id.arboricx` | None — runs self-hosted parser, no effects | -| C / Python / PHP FFI | Native Zig speed | None — can construct and reduce, cannot interpret interaction trees | - -The Haskell `IODriver` is ~500 lines of stateful code (scheduler, frame stack, permission checks, async lifecycle). Replicating it in every host language is a maintenance hazard. We will implement it **once** in Zig and share it through the C ABI. - -## Architecture - -### Layer 1 — Tree Inspection Primitives (C FFI) - -Minimal functions that let C (or other FFIs) inspect raw tree shape. Used internally by the driver, and exposed for non-POSIX hosts that need custom effect handlers. - -```c -int arb_is_leaf(arb_ctx_t* ctx, uint32_t root); -int arb_is_stem(arb_ctx_t* ctx, uint32_t root); -int arb_is_fork(arb_ctx_t* ctx, uint32_t root); -int arb_get_stem_child(arb_ctx_t* ctx, uint32_t root, uint32_t* out); -int arb_get_fork_children(arb_ctx_t* ctx, uint32_t root, - uint32_t* out_left, uint32_t* out_right); -``` - -### Layer 2 — POSIX IO Driver (C FFI) - -A single high-level call that runs the full interaction-tree loop: - -```c -typedef struct { - int allow_read_all; - int allow_write_all; - const char** allowed_read_paths; - size_t allowed_read_count; - const char** allowed_write_paths; - size_t allowed_write_count; -} arb_io_perms_t; - -// Reduce → decode action → perform syscall → feed result → repeat until pure. -// Returns the final pure tree value. -uint32_t arb_run_io(arb_ctx_t* ctx, uint32_t program, - const arb_io_perms_t* perms); -``` - -This is the only call 99% of hosts need. It contains the exact same logic as `IODriver.hs`: - -- **Frame stack** — `BindFrame` (sequencing) and `LocalFrame` (environment scoping) -- **Runtime** — permissions, environment tree, mutable state tree -- **Action dispatch** — decode the tag (pure, bind, putStr, getLine, readFile, writeFile, ask, local, get, put, fork, await, yield, sleep) -- **Scheduler** — runnable queue, blocked tasks, sleeping tasks, wake-on-completion, deadlock detection -- **Error protocol** — ok/err pairs with numeric codes - -### Zig CLI Integration - -Add `--io` and `--unsafe-io` flags to `tricu-zig`: - -```bash -# Safe mode — no filesystem access (default when --io is used) -tricu-zig --io greet.arboricx - -# Unsafe mode — full POSIX access (development / local scripts) -tricu-zig --io --unsafe-io writeThenRead.arboricx - -# Specific paths -# (future: --allow-read ./foo --allow-write ./bar) -``` - -Under `--io`, the CLI loads the bundle, reduces it once to WHNF, then passes the root to `arb_run_io` instead of eagerly decoding the final value. - -## Implementation Stages - -### Stage 1 — Tree Inspection Primitives - -Add the five inspection functions to `ext/zig/src/c_abi.zig` and `ext/zig/include/arboricx.h`. No logic changes to reduction; these just read arena node tags. - -**Acceptance:** A C test program can walk an arbitrary tree built with `arb_fork`/`arb_stem`/`arb_leaf` without knowing the arena internals. - -### Stage 2 — IO Protocol Decoder - -Write `ext/zig/src/io_driver.zig` containing: - -- `decodeAction` — inspect a reduced tree and identify the action tag (pure=0, bind=1, putStr=10, …) -- `isIOSentinel` — verify `"tricuIO"` sentinel and version -- `makePure`, `makeOkResult`, `makeErrResult` — construct standard response trees - -These are pure Zig functions with no syscalls. They mirror `IODriver.hs` logic but operate on arena indices. - -**Acceptance:** Unit tests decode each action type correctly from trees built via codecs. - -### Stage 3 — Synchronous IO Loop - -Implement the core driver loop with a frame stack: - -```zig -while (true) { - current = reduce.reduce(current, scratch_arena, fuel); - if (isIOSentinel(current)) |action| { - switch (decodeAction(action)) { - .pure => { /* pop frame or return */ }, - .bind => { /* push BindFrame, recurse into left */ }, - .putStr => { /* write stdout, continue with Leaf */ }, - .getLine => { /* read stdin, continue with string */ }, - // ... etc - } - } else { - return current; // pure result - } -} -``` - -Support synchronous actions only: `pure`, `bind`, `putStr`, `getLine`, `readFile`, `writeFile`, `ask`, `local`, `get`, `put`. - -**Acceptance:** `greet.tri` and `writeThenRead.tri` run correctly through `tricu-zig --io`. - -### Stage 4 — Scheduler and Async Actions - -Add the task scheduler for `fork`, `await`, `yield`, `sleep`: - -- `Runnable` queue (FIFO) -- `BlockedOn` map (task → blocked task ID) -- `Sleeping` map (task → wake time) -- Round-robin scheduling with `yield` and `sleep` support -- Deadlock detection when no runnable tasks remain and no sleepers - -This mirrors `IODriver.hs` exactly, including task handle encoding (`Fork("task", n)`). - -**Acceptance:** `demos/interactionTrees/forkAwait.tri` and `yield.tri` pass. - -### Stage 5 — Permission System - -Port path canonicalization and permission checks from Haskell: - -- Syntactic normalization (resolve `.`, reject `..`) -- `--unsafe-io` bypass (allow all) -- `--allow-read PATH` / `--allow-write PATH` allowlists -- Error code 20 (`errPolicyDeny`) on violation - -**Acceptance:** File operations outside allowed paths return err pairs, not crashes. - -### Stage 6 — FFI Integration and Host Rollout - -- Expose `arb_run_io` in the C header -- Update Python FFI test to verify IO round-trip -- Update PHP wrapper to support `--io` -- Document the two-layer model for future hosts (use `arb_run_io` for POSIX, Layer 1 primitives for custom runtimes) - -**Acceptance:** Every existing FFI test still passes; new IO test passes in Python. - -## Design Decisions - -### Why baked-in POSIX effects? - -- Most hosts (C, Python, PHP, native CLI) want real stdout/stdin/files. -- One canonical implementation avoids divergence. -- The Haskell `IODriver.hs` remains the reference spec; the Zig driver is the production runtime. - -### Why not callback-based by default? - -Callbacks add complexity for the common case. If a non-POSIX host (e.g., browser JS) needs custom effects, it can use the Layer 1 inspection primitives to build a ~50-line shim without reimplementing the scheduler. We can add `arb_run_io_with_callbacks` later if demand exists. - -### Why not implement in every host language? - -The Haskell `IODriver` is subtle: frame stack unwinding, async lifecycle, deadlock detection, path canonicalization, error code protocol. Bugs in any reimplementation would fracture the language ecosystem. A shared native driver is the only maintainable answer. - -## Risks and Open Questions - -1. **Fuel exhaustion during IO loops** — `arb_run_io` internally calls `reduce.reduce` with a fuel parameter. Should it accept a total fuel budget, or reset fuel per reduction step? The Haskell side has no fuel limit; we may want `arb_run_io_unlimited` and `arb_run_io_fueled` variants. - -2. **State threading** — The Haskell driver threads an environment and mutable state tree through the runtime. These are opaque `T` values manipulated by tricu code. The Zig driver must preserve them exactly across scheduler switches. - -3. **Binary vs text I/O** — `readFile` currently returns bytes (via `ofBytes` / `toString` in Haskell). The Zig driver must match the encoding exactly so that tricu code sees the same values in both hosts. - -4. **Error parity** — Every error code (1–99) and its corresponding tree shape must match Haskell exactly. Divergence here breaks cross-host compatibility. - -## Success Criteria - -- `tricu-zig --io demos/interactionTrees/greet.tri` prints `Hello, tricu` in <10ms. -- `tricu-zig --io --unsafe-io demos/interactionTrees/writeThenRead.tri` writes and reads back a temp file correctly. -- `tricu-zig --io --unsafe-io demos/interactionTrees/forkAwait.tri` completes with correct async results. -- Python FFI can call `arb_run_io` and observe stdout from a tricu program. -- No regression in pure-reduction benchmarks (native path still ~0.005s for `id`). diff --git a/lib/io.tri b/lib/io.tri index 97f2a53..9dcd9a9 100644 --- a/lib/io.tri +++ b/lib/io.tri @@ -17,6 +17,8 @@ getLine = pair 11 t readFile = p : pair 20 p writeFile = p c : pair 21 (pair p c) +putBytes = bs : pair 12 bs +writeBytes = p c : pair 22 (pair p c) ask = pair 30 t local = f action : pair 31 (pair f action) diff --git a/src/IODriver.hs b/src/IODriver.hs index 73e9404..ead662e 100644 --- a/src/IODriver.hs +++ b/src/IODriver.hs @@ -8,7 +8,7 @@ module IODriver , runIOWith ) where -import Research (T(..), apply, toString, toNumber, ofString, ofNumber, ofBytes) +import Research (T(..), apply, toString, toNumber, ofString, ofNumber, ofBytes, toBytes) import qualified Data.ByteString as BS import System.IO (putStr, getLine) import qualified System.IO as IO @@ -22,7 +22,11 @@ import Data.Map.Strict (Map) import qualified Data.Sequence as Seq import Data.Sequence (Seq, (|>), ViewL(..)) import Data.Time.Clock (UTCTime, getCurrentTime, addUTCTime, diffUTCTime) -import Control.Concurrent (threadDelay) +import Control.Concurrent (threadDelay, forkIO) +import Control.Concurrent.STM (TVar, newTVarIO, atomically, readTVar, writeTVar, modifyTVar', retry) +import qualified Data.Set as Set +import Data.Set (Set) +import qualified Data.Foldable as Fold -- --------------------------------------------------------------------------- -- Permissions @@ -115,10 +119,11 @@ errInvalidAction = 40 errInvalidString = 41 -- Async errors (60-79) -errInvalidHandle, errSelfAwait, errInvalidSleep :: Integer +errInvalidHandle, errSelfAwait, errInvalidSleep, errCyclicAwait :: Integer errInvalidHandle = 60 errSelfAwait = 61 errInvalidSleep = 62 +errCyclicAwait = 63 -- Scheduler / runtime errors (80-99) errDeadlock :: Integer @@ -182,9 +187,11 @@ data Action = APure T | ABind T T | APutStr T + | APutBytes T | AGetLine | AReadFile T | AWriteFile T T + | AWriteBytes T T | AAsk | ALocal T T | AGet @@ -203,13 +210,15 @@ tagPure, tagBind :: Integer tagPure = 0 tagBind = 1 -tagPutStr, tagGetLine :: Integer +tagPutStr, tagPutBytes, tagGetLine :: Integer tagPutStr = 10 +tagPutBytes = 12 tagGetLine = 11 -tagReadFile, tagWriteFile :: Integer +tagReadFile, tagWriteFile, tagWriteBytes :: Integer tagReadFile = 20 tagWriteFile = 21 +tagWriteBytes = 22 tagAsk, tagLocal :: Integer tagAsk = 30 @@ -232,7 +241,16 @@ data Step | AwaitRequested TaskId Machine | YieldRequested Machine | SleepRequested Integer Machine - deriving (Show) + | AsyncAction (IO T) Machine + +instance Show Step where + show (Halt _ v) = "Halt _ (" ++ show v ++ ")" + show (Continue m) = "Continue (" ++ show m ++ ")" + show (ForkRequested t m) = "ForkRequested (" ++ show t ++ ") (" ++ show m ++ ")" + show (AwaitRequested tid m) = "AwaitRequested " ++ show tid ++ " (" ++ show m ++ ")" + show (YieldRequested m) = "YieldRequested (" ++ show m ++ ")" + show (SleepRequested n m) = "SleepRequested " ++ show n ++ " (" ++ show m ++ ")" + show (AsyncAction _ m) = "AsyncAction (" ++ show m ++ ")" decodeAction :: T -> Either String Action decodeAction tree = @@ -250,6 +268,9 @@ decodeAction tree = Right n | n == tagPutStr -> Right (APutStr payload) + Right n | n == tagPutBytes -> + Right (APutBytes payload) + Right n | n == tagGetLine -> Right AGetLine @@ -261,6 +282,11 @@ decodeAction tree = Fork path contents -> Right (AWriteFile path contents) _ -> Left "Invalid WriteFile: expected pair path contents" + Right n | n == tagWriteBytes -> + case payload of + Fork path contents -> Right (AWriteBytes path contents) + _ -> Left "Invalid WriteBytes: expected pair path contents" + Right n | n == tagAsk -> Right AAsk @@ -338,15 +364,20 @@ stepMachine machine = APutStr str -> case decodeString str "PutStr" of - Right s -> do - putStr s - finishValue machine Leaf + Right s -> + pure (AsyncAction (putStr s >> pure Leaf) machine) Left _ -> finishValue machine (errResult errInvalidString) - AGetLine -> do - line <- getLine - finishValue machine (ofString line) + APutBytes bs -> + case decodeBytes bs "PutBytes" of + Right b -> + pure (AsyncAction (BS.putStr b >> pure Leaf) machine) + Left _ -> + finishValue machine (errResult errInvalidString) + + AGetLine -> + pure (AsyncAction (ofString <$> getLine) machine) AReadFile path -> case decodeString path "ReadFile" of @@ -354,7 +385,7 @@ stepMachine machine = mDeny <- checkReadPerm p case mDeny of Just denied -> finishValue machine denied - Nothing -> tryReadFile p >>= finishValue machine + Nothing -> pure (AsyncAction (tryReadFile p) machine) Left _ -> finishValue machine (errResult errInvalidString) AWriteFile path contents -> @@ -365,7 +396,19 @@ stepMachine machine = mDeny <- checkWritePerm p case mDeny of Just denied -> finishValue machine denied - Nothing -> tryWriteFile p c >>= finishValue machine + Nothing -> pure (AsyncAction (tryWriteFile p c) machine) + Left _ -> finishValue machine (errResult errInvalidString) + Left _ -> finishValue machine (errResult errInvalidString) + + AWriteBytes path contents -> + case decodeString path "WriteBytes" of + Right p -> + case decodeBytes contents "WriteBytes" of + Right c -> do + mDeny <- checkWritePerm p + case mDeny of + Just denied -> finishValue machine denied + Nothing -> pure (AsyncAction (tryWriteFileBytes p c) machine) Left _ -> finishValue machine (errResult errInvalidString) Left _ -> finishValue machine (errResult errInvalidString) @@ -499,11 +542,22 @@ stepMachine machine = Right () -> return $ okResult Leaf Left e -> return $ errResult (ioErrorCode e) + tryWriteFileBytes path contents = do + result <- try (BS.writeFile path contents) :: IO (Either IOException ()) + case result of + Right () -> return $ okResult Leaf + Left e -> return $ errResult (ioErrorCode e) + decodeString t ctx = case toString t of Right s -> Right s Left _ -> Left $ "Invalid " ++ ctx ++ " string" + decodeBytes t ctx = + case toBytes t of + Right b -> Right b + Left _ -> Left $ "Invalid " ++ ctx ++ " bytes" + -- --------------------------------------------------------------------------- -- Scheduler -- --------------------------------------------------------------------------- @@ -512,60 +566,101 @@ data TaskStatus = Runnable Machine | BlockedOn TaskId Machine | Sleeping UTCTime Machine - | Completed Runtime T + | AsyncWaiting Machine deriving (Show) data Scheduler = Scheduler { schedulerNextTaskId :: Integer , schedulerRunnable :: Seq TaskId , schedulerTasks :: Map TaskId TaskStatus + , schedulerWaiters :: Map TaskId (Seq TaskId) + , schedulerSleepQueue :: Map UTCTime (Set TaskId) + , schedulerAsyncCompleted :: TVar (Map TaskId T) + , schedulerCompleted :: Map TaskId (T, T) } - deriving (Show) -initialScheduler :: Machine -> Scheduler -initialScheduler mainMachine = +instance Show Scheduler where + show s = "Scheduler { schedulerNextTaskId = " ++ show (schedulerNextTaskId s) + ++ ", schedulerRunnable = " ++ show (schedulerRunnable s) + ++ ", schedulerTasks = " ++ show (schedulerTasks s) + ++ ", schedulerWaiters = " ++ show (schedulerWaiters s) + ++ ", schedulerSleepQueue = " ++ show (schedulerSleepQueue s) + ++ ", schedulerAsyncCompleted = " + ++ ", schedulerCompleted = " ++ show (schedulerCompleted s) + ++ " }" + +initialScheduler :: TVar (Map TaskId T) -> Machine -> Scheduler +initialScheduler asyncVar mainMachine = Scheduler { schedulerNextTaskId = 1 , schedulerRunnable = Seq.singleton (TaskId 0) , schedulerTasks = Map.singleton (TaskId 0) (Runnable mainMachine) + , schedulerWaiters = Map.empty + , schedulerSleepQueue = Map.empty + , schedulerAsyncCompleted = asyncVar + , schedulerCompleted = Map.empty } -runtimeOfStatus :: TaskStatus -> Runtime -runtimeOfStatus (Runnable machine) = machineRuntime machine -runtimeOfStatus (BlockedOn _ machine) = machineRuntime machine -runtimeOfStatus (Sleeping _ machine) = machineRuntime machine -runtimeOfStatus (Completed runtime _) = runtime +runtimeOfStatus :: TaskStatus -> Maybe Runtime +runtimeOfStatus (Runnable machine) = Just (machineRuntime machine) +runtimeOfStatus (BlockedOn _ machine) = Just (machineRuntime machine) +runtimeOfStatus (Sleeping _ machine) = Just (machineRuntime machine) +runtimeOfStatus (AsyncWaiting machine) = Just (machineRuntime machine) wakeAwaiters :: TaskId -> T -> Scheduler -> Scheduler wakeAwaiters targetId value scheduler = - let (newlyRunnable, tasks') = - Map.mapAccumWithKey wakeOne [] (schedulerTasks scheduler) - queue' = foldl (|>) (schedulerRunnable scheduler) (reverse newlyRunnable) - in scheduler { schedulerTasks = tasks', schedulerRunnable = queue' } + case Map.lookup targetId (schedulerWaiters scheduler) of + Nothing -> scheduler + Just waiters -> + let (tasks', queue') = Fold.foldl' (wakeOne targetId value) + (schedulerTasks scheduler, schedulerRunnable scheduler) + waiters + in scheduler + { schedulerTasks = tasks' + , schedulerRunnable = queue' + , schedulerWaiters = Map.delete targetId (schedulerWaiters scheduler) + } where - wakeOne acc tid (BlockedOn blockedTarget machine) - | blockedTarget == targetId = + wakeOne _ _ (tasks, queue) waiterId = + case Map.lookup waiterId tasks of + Just (BlockedOn _ machine) -> let machine' = machine { machineCurrent = pureAction value } - in (tid : acc, Runnable machine') - wakeOne acc _ status = (acc, status) + in (Map.insert waiterId (Runnable machine') tasks, queue |> waiterId) + _ -> (tasks, queue) wakeDueSleepers :: Scheduler -> IO Scheduler wakeDueSleepers scheduler = do now <- getCurrentTime - let (newlyRunnable, tasks') = - Map.mapAccumWithKey (wakeOne now) [] (schedulerTasks scheduler) - queue' = foldl (|>) (schedulerRunnable scheduler) (reverse newlyRunnable) - pure scheduler { schedulerTasks = tasks', schedulerRunnable = queue' } - where - wakeOne now acc tid (Sleeping wakeTime machine) - | wakeTime <= now = (tid : acc, Runnable machine) - wakeOne _ acc _ status = (acc, status) + let go sq accTasks accQueue = + case Map.lookupMin sq of + Nothing -> (accTasks, accQueue, sq) + Just (t, taskSet) + | t <= now -> + let tasks' = Fold.foldl' (\m tid -> + case Map.lookup tid m of + Just (Sleeping _ machine) -> Map.insert tid (Runnable machine) m + _ -> m + ) accTasks (Set.toList taskSet) + queue' = Fold.foldl' (|>) accQueue (Set.toList taskSet) + in go (Map.deleteMin sq) tasks' queue' + | otherwise -> (accTasks, accQueue, sq) + (tasks', queue', sq') = go (schedulerSleepQueue scheduler) + (schedulerTasks scheduler) + (schedulerRunnable scheduler) + pure scheduler + { schedulerTasks = tasks' + , schedulerRunnable = queue' + , schedulerSleepQueue = sq' + } nearestSleepTime :: Scheduler -> Maybe UTCTime -nearestSleepTime = Map.foldl' minSleep Nothing . schedulerTasks +nearestSleepTime = fmap fst . Map.lookupMin . schedulerSleepQueue + +hasAsyncWaiters :: Scheduler -> Bool +hasAsyncWaiters = any isAsync . Map.elems . schedulerTasks where - minSleep acc (Sleeping t _) = Just $ maybe t (min t) acc - minSleep acc _ = acc + isAsync (AsyncWaiting _) = True + isAsync _ = False resumeCurrentWith :: TaskId -> T -> Machine -> Scheduler -> IO Scheduler resumeCurrentWith taskId value machine scheduler = @@ -575,6 +670,13 @@ resumeCurrentWith taskId value machine scheduler = , schedulerRunnable = schedulerRunnable scheduler |> taskId } +wouldCycle :: TaskId -> TaskId -> Map TaskId TaskStatus -> Bool +wouldCycle target current tasks = + case Map.lookup target tasks of + Just (BlockedOn next _) -> + next == current || wouldCycle next current tasks + _ -> False + handleStep :: TaskId -> Step -> Scheduler -> IO Scheduler handleStep taskId (Continue machine) scheduler = pure scheduler @@ -582,12 +684,12 @@ handleStep taskId (Continue machine) scheduler = , schedulerRunnable = schedulerRunnable scheduler |> taskId } -handleStep taskId (Halt _runtime value) scheduler = - pure (wakeAwaiters taskId value scheduler') - where - scheduler' = scheduler - { schedulerTasks = Map.insert taskId (Completed _runtime value) (schedulerTasks scheduler) - } +handleStep taskId (Halt runtime value) scheduler = + let scheduler' = wakeAwaiters taskId value scheduler + in pure scheduler' + { schedulerTasks = Map.delete taskId (schedulerTasks scheduler') + , schedulerCompleted = Map.insert taskId (value, rtState runtime) (schedulerCompleted scheduler') + } handleStep parentId (ForkRequested childAction parentMachine) scheduler = let childId = TaskId (schedulerNextTaskId scheduler) @@ -618,22 +720,29 @@ handleStep parentId (ForkRequested childAction parentMachine) scheduler = } handleStep currentId (AwaitRequested targetId machine) scheduler - | targetId == currentId = + | currentId == targetId = resumeCurrentWith currentId selfAwaitResult machine scheduler | otherwise = case Map.lookup targetId (schedulerTasks scheduler) of Nothing -> - resumeCurrentWith currentId invalidAsyncHandleResult machine scheduler + case Map.lookup targetId (schedulerCompleted scheduler) of + Just (value, _) -> resumeCurrentWith currentId value machine scheduler + Nothing -> resumeCurrentWith currentId invalidAsyncHandleResult machine scheduler - Just (Completed _ value) -> - resumeCurrentWith currentId value machine scheduler + Just (BlockedOn nextId _) -> + if wouldCycle targetId currentId (schedulerTasks scheduler) + then resumeCurrentWith currentId (errResult errCyclicAwait) machine scheduler + else block - Just _ -> - pure scheduler - { schedulerTasks = - Map.insert currentId (BlockedOn targetId machine) (schedulerTasks scheduler) - } + Just _ -> block + where + block = pure scheduler + { schedulerTasks = Map.insert currentId (BlockedOn targetId machine) (schedulerTasks scheduler) + , schedulerWaiters = Map.alter addWaiter targetId (schedulerWaiters scheduler) + } + addWaiter Nothing = Just (Seq.singleton currentId) + addWaiter (Just sq) = Just (sq |> currentId) handleStep taskId (YieldRequested machine) scheduler = resumeCurrentWith taskId Leaf machine scheduler @@ -644,8 +753,16 @@ handleStep taskId (SleepRequested ms machine) scheduler = do wakeTime = addUTCTime seconds now machine' = machine { machineCurrent = pureAction Leaf } pure scheduler - { schedulerTasks = - Map.insert taskId (Sleeping wakeTime machine') (schedulerTasks scheduler) + { schedulerTasks = Map.insert taskId (Sleeping wakeTime machine') (schedulerTasks scheduler) + , schedulerSleepQueue = Map.alter (Just . maybe (Set.singleton taskId) (Set.insert taskId)) wakeTime (schedulerSleepQueue scheduler) + } + +handleStep taskId (AsyncAction ioAction machine) scheduler = do + _ <- forkIO $ do + result <- ioAction + atomically $ modifyTVar' (schedulerAsyncCompleted scheduler) (Map.insert taskId result) + pure scheduler + { schedulerTasks = Map.insert taskId (AsyncWaiting machine) (schedulerTasks scheduler) } handleNoRunnable :: Scheduler -> IO Scheduler @@ -658,20 +775,42 @@ handleNoRunnable scheduler = wakeDueSleepers scheduler Nothing -> - case Map.lookup (TaskId 0) (schedulerTasks scheduler) of - Just status -> - pure scheduler - { schedulerTasks = - Map.insert (TaskId 0) - (Completed (runtimeOfStatus status) deadlockResult) - (schedulerTasks scheduler) - } - Nothing -> + if hasAsyncWaiters scheduler + then do + -- Block efficiently until at least one async operation completes. + atomically $ do + m <- readTVar (schedulerAsyncCompleted scheduler) + if Map.null m then retry else return () pure scheduler + else + case Map.lookup (TaskId 0) (schedulerTasks scheduler) of + Just status -> + case runtimeOfStatus status of + Just runtime -> + let scheduler' = wakeAwaiters (TaskId 0) deadlockResult scheduler + in pure scheduler' + { schedulerTasks = Map.delete (TaskId 0) (schedulerTasks scheduler') + , schedulerCompleted = Map.insert (TaskId 0) (deadlockResult, rtState runtime) (schedulerCompleted scheduler') + } + Nothing -> pure scheduler + Nothing -> pure scheduler schedulerStep :: Scheduler -> IO Scheduler schedulerStep scheduler = do - scheduler1 <- wakeDueSleepers scheduler + -- Poll completed async operations and resume their tasks. + completed <- atomically $ do + m <- readTVar (schedulerAsyncCompleted scheduler) + writeTVar (schedulerAsyncCompleted scheduler) Map.empty + return m + schedulerAfterAsync <- Fold.foldlM + (\s (tid, val) -> + case Map.lookup tid (schedulerTasks s) of + Just (AsyncWaiting machine) -> resumeCurrentWith tid val machine s + _ -> pure s) + scheduler + (Map.toList completed) + + scheduler1 <- wakeDueSleepers schedulerAfterAsync case Seq.viewl (schedulerRunnable scheduler1) of EmptyL -> handleNoRunnable scheduler1 @@ -687,9 +826,9 @@ schedulerStep scheduler = do runScheduler :: Scheduler -> IO (T, T) runScheduler scheduler = - case Map.lookup (TaskId 0) (schedulerTasks scheduler) of - Just (Completed runtime value) -> - pure (value, rtState runtime) + case Map.lookup (TaskId 0) (schedulerCompleted scheduler) of + Just (value, finalState) -> + pure (value, finalState) _ -> schedulerStep scheduler >>= runScheduler @@ -698,26 +837,29 @@ runScheduler scheduler = -- Public API -- --------------------------------------------------------------------------- -runIOWith :: IOPermissions -> T -> T -> T -> IO (T, T) +runIOWith :: IOPermissions -> T -> T -> T -> IO (Either String (T, T)) runIOWith perms env initialState action = - runScheduler (initialScheduler initialMachine) - where - initialMachine = Machine - { machineRuntime = Runtime - { rtPerms = perms - , rtEnv = env - , rtState = initialState - } - , machineCurrent = action - , machineFrames = [] - } + case checkIOSentinel action of + Left err -> pure (Left err) + Right (_, action') -> do + asyncVar <- newTVarIO Map.empty + let initialMachine = Machine + { machineRuntime = Runtime + { rtPerms = perms + , rtEnv = env + , rtState = initialState + } + , machineCurrent = action' + , machineFrames = [] + } + Right <$> runScheduler (initialScheduler asyncVar initialMachine) -runIOWithEnv :: IOPermissions -> T -> T -> IO T +runIOWithEnv :: IOPermissions -> T -> T -> IO (Either String T) runIOWithEnv perms env action = do - (result, _) <- runIOWith perms env Leaf action - pure result + result <- runIOWith perms env Leaf action + pure (fmap fst result) -runIO :: IOPermissions -> T -> IO T +runIO :: IOPermissions -> T -> IO (Either String T) runIO perms action = do - (result, _) <- runIOWith perms Leaf Leaf action - pure result + result <- runIOWith perms Leaf Leaf action + pure (fmap fst result) diff --git a/src/Main.hs b/src/Main.hs index f91a74b..e648423 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -5,7 +5,7 @@ import System.Exit (die) import Server (runServerWithPath) import Eval (evalTricu, evalTricuWithStore, mainResult, result) import FileEval (evaluateFileWithContext, evaluateFileWithStore, compileFile) -import IODriver (IOPermissions(..), checkIOSentinel, runIO) +import IODriver (IOPermissions(..), runIO) import Parser (parseTricu) import REPL (repl) import Research (T, EvaluatedForm(..), Env, formatT, exportDag) @@ -307,17 +307,17 @@ runEval opts = do finalEnv <- foldM (evaluateFileWithStore mconn) Map.empty files return $ mainResult finalEnv finalT <- if evalIo opts - then case checkIOSentinel resultT of - Right (1, action) -> do - let perms = IOPermissions - { allowRead = evalAllowRead opts - , allowWrite = evalAllowWrite opts - , allowReadAll = evalUnsafeIo opts || evalAllowReadAll opts - , allowWriteAll = evalUnsafeIo opts || evalAllowWriteAll opts - } - runIO perms action - Right (v, _) -> die $ "Unsupported IO ABI version: " ++ show v - Left err -> die $ "IO mode requested but " ++ err + then do + let perms = IOPermissions + { allowRead = evalAllowRead opts + , allowWrite = evalAllowWrite opts + , allowReadAll = evalUnsafeIo opts || evalAllowReadAll opts + , allowWriteAll = evalUnsafeIo opts || evalAllowWriteAll opts + } + result <- runIO perms resultT + case result of + Left err -> die $ "IO error: " ++ err + Right val -> pure val else return resultT case mconn of Just conn -> close conn diff --git a/test/Spec.hs b/test/Spec.hs index 5e7345e..c85d0dc 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -1809,16 +1809,56 @@ ioDriverTests = testGroup "IO driver tests" ] final @?= ofString "child done" st @?= Leaf + + -- Scheduler hardening tests + , testCase "runIO rejects non-IO tree with sentinel error" $ do + result <- runIO unsafePerms (ofString "not an io program") + case result of + Left _ -> return () + Right _ -> assertFailure "Expected Left for invalid sentinel" + + , testCase "cyclic await returns error instead of hanging" $ do + (final, _) <- runIOSourceWith unsafePerms Leaf Leaf $ + unlines + [ "main = io (bind (fork (await (pair \"task\" 0))) (h :" + , " await h))" + ] + final @?= ioErrResult 63 + + , testCase "writeBytes and readFile roundtrip binary data" $ + withSystemTempDirectory "tricu-io-bytes" $ \dir -> do + let path = dir ++ "/binary.bin" + final <- runIOSource $ + unlines + [ "main = io (bind (writeBytes \"" ++ path ++ "\" [(0) (255) (128) (1)])" + , " (_ : readFile \"" ++ path ++ "\"))" + ] + final @?= ioOkResult (ofBytes (BS.pack [0, 255, 128, 1])) + + , testCase "stress test: many sleeping tasks complete promptly" $ do + let n = 100 + build 0 = "pure \"done\"" + build k = "bind (fork (bind (sleep 1) (_ : pure \"x\"))) (h : bind (await h) (_ : " ++ build (k - 1) ++ "))" + (final, _) <- runIOSourceWith unsafePerms Leaf Leaf ("main = io (" ++ build n ++ ")") + final @?= ofString "done" + + , testCase "long fork await loop does not leak" $ do + let n = 200 + build 0 = "pure \"done\"" + build k = "bind (fork (pure \"x\")) (h : bind (await h) (_ : " ++ build (k - 1) ++ "))" + (final, _) <- runIOSourceWith unsafePerms Leaf Leaf ("main = io (" ++ build n ++ ")") + final @?= ofString "done" ] runIOSourceWith :: IOPermissions -> T -> T -> String -> IO (T, T) runIOSourceWith perms readerEnv initialState source = do ioEnv <- evaluateFile "./lib/io.tri" evalEnv <- evalTricuWithStore Nothing ioEnv (parseTricu source) - case checkIOSentinel (mainResult evalEnv) of - Right (1, action) -> runIOWith perms readerEnv initialState action - Right (v, _) -> assertFailure ("Unsupported IO ABI version: " ++ show v) - Left err -> assertFailure ("Expected IO sentinel: " ++ err) + let fullTree = mainResult evalEnv + result <- runIOWith perms readerEnv initialState fullTree + case result of + Left err -> assertFailure ("IO runtime error: " ++ err) + Right pair -> pure pair runIOSource :: String -> IO T runIOSource source = fmap fst $ runIOSourceWith unsafePerms Leaf Leaf source