{-
  Copyright (c) Meta Platforms, Inc. and affiliates.
  All rights reserved.

  This source code is licensed under the BSD-style license found in the
  LICENSE file in the root directory of this source tree.
-}

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveDataTypeable #-}

-- | Defines 'runHaxl'.  Most users should import "Haxl.Core" instead.
--
module Haxl.Core.Run
  ( runHaxl
  , runHaxlWithWrites
  ) where

import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.IORef
import Data.Maybe
import Text.Printf
import Unsafe.Coerce

import Haxl.Core.DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Fetch
import Haxl.Core.Profile
import Haxl.Core.RequestStore as RequestStore
import Haxl.Core.Stats
import Haxl.Core.Util

import qualified Data.HashTable.IO as H

-- -----------------------------------------------------------------------------
-- runHaxl

-- | Runs a 'Haxl' computation in the given 'Env'.
--
-- Note: to make multiple concurrent calls to 'runHaxl', each one must
-- have a separate 'Env'. A single 'Env' must /not/ be shared between
-- multiple concurrent calls to 'runHaxl', otherwise deadlocks or worse
-- will likely ensue.
--
-- However, multiple 'Env's may share a single 'StateStore', and thereby
-- use the same set of datasources.
runHaxl:: forall u w a. Monoid w => Env u w -> GenHaxl u w a -> IO a
runHaxl :: forall u w a. Monoid w => Env u w -> GenHaxl u w a -> IO a
runHaxl Env u w
env GenHaxl u w a
haxl = (a, w) -> a
forall a b. (a, b) -> a
fst ((a, w) -> a) -> IO (a, w) -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Env u w -> GenHaxl u w a -> IO (a, w)
forall u w a. Monoid w => Env u w -> GenHaxl u w a -> IO (a, w)
runHaxlWithWrites Env u w
env GenHaxl u w a
haxl

runHaxlWithWrites :: forall u w a. Monoid w => Env u w -> GenHaxl u w a -> IO (a, w)
runHaxlWithWrites :: forall u w a. Monoid w => Env u w -> GenHaxl u w a -> IO (a, w)
runHaxlWithWrites env :: Env u w
env@Env{u
Int
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef w
IORef Int
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
Flags
HaxlDataCache u w
StateStore
ProfileCurrent
dataCache :: HaxlDataCache u w
memoCache :: HaxlDataCache u w
memoKey :: Int
flags :: Flags
userEnv :: u
statsRef :: IORef Stats
statsBatchIdRef :: IORef Int
callIdRef :: IORef Int
profCurrent :: ProfileCurrent
profRef :: IORef Profile
states :: StateStore
reqStoreRef :: IORef (RequestStore u)
runQueueRef :: IORef (JobList u w)
submittedReqsRef :: IORef ReqCountMap
completions :: TVar [CompleteReq u w]
writeLogsRef :: IORef w
writeLogsRefNoMemo :: IORef w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef w
writeLogsRef :: forall u w. Env u w -> IORef w
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef Int
statsBatchIdRef :: forall u w. Env u w -> IORef Int
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> Int
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} GenHaxl u w a
haxl = do
  result@IVar{ivarRef = resultRef} <- IO (IVar u w a)
forall u w a. IO (IVar u w a)
newIVar -- where to put the final result
  ifTraceLog <- do
    if trace flags < 3
    then return $ \String
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    else do
      start <- getTimestamp
      return $ \String
s -> do
        now <- IO Timestamp
getTimestamp
        let t = Timestamp -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Timestamp
now Timestamp -> Timestamp -> Timestamp
forall a. Num a => a -> a -> a
- Timestamp
start) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
1000.0 :: Double
        printf "%.1fms: %s" t (s :: String)
  let
    -- Run a job, and put its result in the given IVar
    schedule :: Env u w -> JobList u w -> GenHaxl u w b -> IVar u w b -> IO ()
    schedule env :: Env u w
env@Env{u
Int
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef w
IORef Int
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
Flags
HaxlDataCache u w
StateStore
ProfileCurrent
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef w
writeLogsRef :: forall u w. Env u w -> IORef w
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef Int
statsBatchIdRef :: forall u w. Env u w -> IORef Int
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> Int
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: HaxlDataCache u w
memoCache :: HaxlDataCache u w
memoKey :: Int
flags :: Flags
userEnv :: u
statsRef :: IORef Stats
statsBatchIdRef :: IORef Int
callIdRef :: IORef Int
profCurrent :: ProfileCurrent
profRef :: IORef Profile
states :: StateStore
reqStoreRef :: IORef (RequestStore u)
runQueueRef :: IORef (JobList u w)
submittedReqsRef :: IORef ReqCountMap
completions :: TVar [CompleteReq u w]
writeLogsRef :: IORef w
writeLogsRefNoMemo :: IORef w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
..} JobList u w
rq (GenHaxl Env u w -> IO (Result u w b)
run) ivar :: IVar u w b
ivar@IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = !IORef (IVarContents u w b)
ref} = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> Int -> String
forall r. PrintfType r => String -> r
printf String
"schedule: %d\n" (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ JobList u w -> Int
forall u w. JobList u w -> Int
lengthJobList JobList u w
rq)
      let {-# INLINE result #-}
          result :: ResultVal b w -> IO ()
result ResultVal b w
r = do
            e <- IORef (IVarContents u w b) -> IO (IVarContents u w b)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w b)
ref
            case e of
              IVarFull ResultVal b w
_ ->
                -- An IVar is typically only meant to be written to once
                -- so it would make sense to throw an error here. But there
                -- are legitimate use-cases for writing several times.
                -- (See Haxl.Core.Parallel)
                Env u w -> JobList u w -> IO ()
forall u w. Env u w -> JobList u w -> IO ()
reschedule Env u w
env JobList u w
rq
              IVarEmpty JobList u w
haxls -> do
                IORef (IVarContents u w b) -> IVarContents u w b -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (IVarContents u w b)
ref (ResultVal b w -> IVarContents u w b
forall u w a. ResultVal a w -> IVarContents u w a
IVarFull ResultVal b w
r)
                -- Have we got the final result now?
                if IORef (IVarContents u w b)
ref IORef (IVarContents u w b) -> IORef (IVarContents u w b) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (IVarContents u w a) -> IORef (IVarContents u w b)
forall a b. a -> b
unsafeCoerce IORef (IVarContents u w a)
resultRef
                        -- comparing IORefs of different types is safe, it's
                        -- pointer-equality on the MutVar#.
                   then
                     -- We have a result, but don't discard unfinished
                     -- computations in the run queue. See
                     -- Note [runHaxl and unfinished requests].
                     -- Nothing can depend on the final IVar, so haxls must
                     -- be empty.
                     case JobList u w
rq of
                       JobList u w
JobNil -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                       JobList u w
_ -> IORef (JobList u w) -> (JobList u w -> JobList u w) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (JobList u w)
runQueueRef (JobList u w -> JobList u w -> JobList u w
forall u w. JobList u w -> JobList u w -> JobList u w
appendJobList JobList u w
rq)
                   else Env u w -> JobList u w -> IO ()
forall u w. Env u w -> JobList u w -> IO ()
reschedule Env u w
env (JobList u w -> JobList u w -> JobList u w
forall u w. JobList u w -> JobList u w -> JobList u w
appendJobList JobList u w
haxls JobList u w
rq)
      r <-
        if ReportFlag -> ReportFlags -> Bool
testReportFlag ReportFlag
ReportProfiling (ReportFlags -> Bool) -> ReportFlags -> Bool
forall a b. (a -> b) -> a -> b
$ Flags -> ReportFlags
report Flags
flags  -- withLabel unfolded
          then IO (Result u w b) -> IO (Either SomeException (Result u w b))
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (IO (Result u w b) -> IO (Either SomeException (Result u w b)))
-> IO (Result u w b) -> IO (Either SomeException (Result u w b))
forall a b. (a -> b) -> a -> b
$ (Env u w -> IO (Result u w b)) -> Env u w -> IO (Result u w b)
forall u w a.
(Env u w -> IO (Result u w a)) -> Env u w -> IO (Result u w a)
profileCont Env u w -> IO (Result u w b)
run Env u w
env
          else IO (Result u w b) -> IO (Either SomeException (Result u w b))
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (IO (Result u w b) -> IO (Either SomeException (Result u w b)))
-> IO (Result u w b) -> IO (Either SomeException (Result u w b))
forall a b. (a -> b) -> a -> b
$ Env u w -> IO (Result u w b)
run Env u w
env
      case r of
        Left SomeException
e -> do
          SomeException -> IO ()
rethrowAsyncExceptions SomeException
e
          ResultVal b w -> IO ()
result (SomeException -> ResultVal b w
forall a w. SomeException -> ResultVal a w
ThrowIO SomeException
e)
        Right (Done b
a) -> do
          wt <- IORef w -> IO w
forall a. IORef a -> IO a
readIORef IORef w
writeLogsRef
          result $ Ok a (Just wt)
        Right (Throw SomeException
ex) -> do
          wt <- IORef w -> IO w
forall a. IORef a -> IO a
readIORef IORef w
writeLogsRef
          result $ ThrowHaxl ex (Just wt)
        Right (Blocked IVar u w b
i Cont u w b
fn) -> do
          Env u w -> GenHaxl u w b -> IVar u w b -> IVar u w b -> IO ()
forall u w b a.
Env u w -> GenHaxl u w b -> IVar u w b -> IVar u w a -> IO ()
addJob Env u w
env (Cont u w b -> GenHaxl u w b
forall u w a. Cont u w a -> GenHaxl u w a
toHaxl Cont u w b
fn) IVar u w b
ivar IVar u w b
i
          Env u w -> JobList u w -> IO ()
forall u w. Env u w -> JobList u w -> IO ()
reschedule Env u w
env JobList u w
rq

    -- Here we have a choice:
    --   - If the requestStore is non-empty, we could submit those
    --     requests right away without waiting for more.  This might
    --     be good for latency, especially if the data source doesn't
    --     support batching, or if batching is pessimal.
    --   - To optimise the batch sizes, we want to execute as much as
    --     we can and only submit requests when we have no more
    --     computation to do.
    --   - compromise: wait at least Nms for an outstanding result
    --     before giving up and submitting new requests.
    --
    -- For now we use the batching strategy in the scheduler, but
    -- individual data sources can request that their requests are
    -- sent eagerly by using schedulerHint.
    --
    reschedule :: Env u w -> JobList u w -> IO ()
    reschedule env :: Env u w
env@Env{u
Int
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef w
IORef Int
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
Flags
HaxlDataCache u w
StateStore
ProfileCurrent
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef w
writeLogsRef :: forall u w. Env u w -> IORef w
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef Int
statsBatchIdRef :: forall u w. Env u w -> IORef Int
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> Int
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: HaxlDataCache u w
memoCache :: HaxlDataCache u w
memoKey :: Int
flags :: Flags
userEnv :: u
statsRef :: IORef Stats
statsBatchIdRef :: IORef Int
callIdRef :: IORef Int
profCurrent :: ProfileCurrent
profRef :: IORef Profile
states :: StateStore
reqStoreRef :: IORef (RequestStore u)
runQueueRef :: IORef (JobList u w)
submittedReqsRef :: IORef ReqCountMap
completions :: TVar [CompleteReq u w]
writeLogsRef :: IORef w
writeLogsRefNoMemo :: IORef w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
..} JobList u w
haxls = do
      case JobList u w
haxls of
        JobList u w
JobNil -> do
          rq <- IORef (JobList u w) -> IO (JobList u w)
forall a. IORef a -> IO a
readIORef IORef (JobList u w)
runQueueRef
          case rq of
            JobList u w
JobNil -> Env u w -> IO ()
forall u w. Env u w -> IO ()
emptyRunQueue Env u w
env
            JobCons Env u w
env' GenHaxl u w a
a IVar u w a
b JobList u w
c -> do
              IORef (JobList u w) -> JobList u w -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (JobList u w)
runQueueRef JobList u w
forall u w. JobList u w
JobNil
              Env u w -> JobList u w -> GenHaxl u w a -> IVar u w a -> IO ()
forall u w b.
Env u w -> JobList u w -> GenHaxl u w b -> IVar u w b -> IO ()
schedule Env u w
env' JobList u w
c GenHaxl u w a
a IVar u w a
b
        JobCons Env u w
env' GenHaxl u w a
a IVar u w a
b JobList u w
c ->
          Env u w -> JobList u w -> GenHaxl u w a -> IVar u w a -> IO ()
forall u w b.
Env u w -> JobList u w -> GenHaxl u w b -> IVar u w b -> IO ()
schedule Env u w
env' JobList u w
c GenHaxl u w a
a IVar u w a
b

    emptyRunQueue :: Env u w -> IO ()
    emptyRunQueue Env u w
env = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"emptyRunQueue\n"
      haxls <- Env u w -> IO (JobList u w)
forall u w. Env u w -> IO (JobList u w)
checkCompletions Env u w
env
      case haxls of
        JobList u w
JobNil -> Env u w -> IO ()
forall u w. Env u w -> IO ()
checkRequestStore Env u w
env
        JobList u w
_ -> Env u w -> JobList u w -> IO ()
forall u w. Env u w -> JobList u w -> IO ()
reschedule Env u w
env JobList u w
haxls

    checkRequestStore :: Env u w -> IO ()
    checkRequestStore env :: Env u w
env@Env{u
Int
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef w
IORef Int
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
Flags
HaxlDataCache u w
StateStore
ProfileCurrent
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef w
writeLogsRef :: forall u w. Env u w -> IORef w
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef Int
statsBatchIdRef :: forall u w. Env u w -> IORef Int
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> Int
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: HaxlDataCache u w
memoCache :: HaxlDataCache u w
memoKey :: Int
flags :: Flags
userEnv :: u
statsRef :: IORef Stats
statsBatchIdRef :: IORef Int
callIdRef :: IORef Int
profCurrent :: ProfileCurrent
profRef :: IORef Profile
states :: StateStore
reqStoreRef :: IORef (RequestStore u)
runQueueRef :: IORef (JobList u w)
submittedReqsRef :: IORef ReqCountMap
completions :: TVar [CompleteReq u w]
writeLogsRef :: IORef w
writeLogsRefNoMemo :: IORef w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
..} = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"checkRequestStore\n"
      reqStore <- IORef (RequestStore u) -> IO (RequestStore u)
forall a. IORef a -> IO a
readIORef IORef (RequestStore u)
reqStoreRef
      if RequestStore.isEmpty reqStore
        then waitCompletions env
        else do
          ifTraceLog $ printf "performFetches %d\n" (RequestStore.getSize reqStore)
          writeIORef reqStoreRef noRequests
          performRequestStore env reqStore
          -- empty the cache if we're not caching.  Is this the best
          -- place to do it?  We do get to de-duplicate requests that
          -- happen simultaneously.
          when (caching flags == 0) $ do
            let DataCache dc = dataCache
            H.foldM (\()
_ (TypeRep
k, SubCache (DataCacheItem u w)
_) -> HashTable TypeRep (SubCache (DataCacheItem u w))
-> TypeRep -> IO ()
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO ()
H.delete HashTable RealWorld TypeRep (SubCache (DataCacheItem u w))
HashTable TypeRep (SubCache (DataCacheItem u w))
dc TypeRep
k) () dc
          emptyRunQueue env

    checkCompletions :: Env u w -> IO (JobList u w)
    checkCompletions Env{u
Int
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef w
IORef Int
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
Flags
HaxlDataCache u w
StateStore
ProfileCurrent
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef w
writeLogsRef :: forall u w. Env u w -> IORef w
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef Int
statsBatchIdRef :: forall u w. Env u w -> IORef Int
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> Int
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: HaxlDataCache u w
memoCache :: HaxlDataCache u w
memoKey :: Int
flags :: Flags
userEnv :: u
statsRef :: IORef Stats
statsBatchIdRef :: IORef Int
callIdRef :: IORef Int
profCurrent :: ProfileCurrent
profRef :: IORef Profile
states :: StateStore
reqStoreRef :: IORef (RequestStore u)
runQueueRef :: IORef (JobList u w)
submittedReqsRef :: IORef ReqCountMap
completions :: TVar [CompleteReq u w]
writeLogsRef :: IORef w
writeLogsRefNoMemo :: IORef w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
..} = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"checkCompletions\n"
      comps <- LogicBug -> STM [CompleteReq u w] -> IO [CompleteReq u w]
forall e a. Exception e => e -> STM a -> IO a
atomicallyOnBlocking (ReadingCompletionsFailedRun -> LogicBug
forall e. Exception e => e -> LogicBug
LogicBug ReadingCompletionsFailedRun
ReadingCompletionsFailedRun) (STM [CompleteReq u w] -> IO [CompleteReq u w])
-> STM [CompleteReq u w] -> IO [CompleteReq u w]
forall a b. (a -> b) -> a -> b
$ do
        c <- TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions
        writeTVar completions []
        return c
      case comps of
        [] -> JobList u w -> IO (JobList u w)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList u w
forall u w. JobList u w
JobNil
        [CompleteReq u w]
_ -> do
          String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> Int -> String
forall r. PrintfType r => String -> r
printf String
"%d complete\n" ([CompleteReq u w] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CompleteReq u w]
comps)
          let
              getComplete :: CompleteReq u w -> IO (JobList u w)
getComplete (CompleteReq ResultVal a w
a IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = !IORef (IVarContents u w a)
cr} Timestamp
allocs) = do
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Timestamp
allocs Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
< Timestamp
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  cur <- IO Timestamp
getAllocationCounter
                  setAllocationCounter (cur + allocs)
                r <- IORef (IVarContents u w a) -> IO (IVarContents u w a)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w a)
cr
                case r of
                  IVarFull ResultVal a w
_ -> do
                    String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"existing result\n"
                    JobList u w -> IO (JobList u w)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList u w
forall u w. JobList u w
JobNil
                    -- this happens if a data source reports a result,
                    -- and then throws an exception.  We call putResult
                    -- a second time for the exception, which comes
                    -- ahead of the original request (because it is
                    -- pushed on the front of the completions list) and
                    -- therefore overrides it.
                  IVarEmpty JobList u w
cv -> do
                    IORef (IVarContents u w a) -> IVarContents u w a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (IVarContents u w a)
cr (ResultVal a w -> IVarContents u w a
forall u w a. ResultVal a w -> IVarContents u w a
IVarFull ResultVal a w
a)
                    JobList u w -> IO (JobList u w)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList u w
cv
          jobs <- (CompleteReq u w -> IO (JobList u w))
-> [CompleteReq u w] -> IO [JobList u w]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM CompleteReq u w -> IO (JobList u w)
forall {u} {w}. CompleteReq u w -> IO (JobList u w)
getComplete [CompleteReq u w]
comps
          return (foldr appendJobList JobNil jobs)

    waitCompletions :: Env u w -> IO ()
    waitCompletions env :: Env u w
env@Env{u
Int
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef w
IORef Int
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
Flags
HaxlDataCache u w
StateStore
ProfileCurrent
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef w
writeLogsRef :: forall u w. Env u w -> IORef w
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef Int
statsBatchIdRef :: forall u w. Env u w -> IORef Int
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> Int
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: HaxlDataCache u w
memoCache :: HaxlDataCache u w
memoKey :: Int
flags :: Flags
userEnv :: u
statsRef :: IORef Stats
statsBatchIdRef :: IORef Int
callIdRef :: IORef Int
profCurrent :: ProfileCurrent
profRef :: IORef Profile
states :: StateStore
reqStoreRef :: IORef (RequestStore u)
runQueueRef :: IORef (JobList u w)
submittedReqsRef :: IORef ReqCountMap
completions :: TVar [CompleteReq u w]
writeLogsRef :: IORef w
writeLogsRefNoMemo :: IORef w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
..} = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"waitCompletions\n"
      let
        wrapped :: STM a -> IO a
wrapped = LogicBug -> STM a -> IO a
forall e a. Exception e => e -> STM a -> IO a
atomicallyOnBlocking (ReadingCompletionsFailedRun -> LogicBug
forall e. Exception e => e -> LogicBug
LogicBug ReadingCompletionsFailedRun
ReadingCompletionsFailedRun)
        doWait :: IO ()
doWait = STM () -> IO ()
forall {a}. STM a -> IO a
wrapped (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          c <- TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions
          when (null c) retry
        doWaitProfiled :: IO ()
doWaitProfiled = do
          queueEmpty <- [CompleteReq u w] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([CompleteReq u w] -> Bool) -> IO [CompleteReq u w] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM [CompleteReq u w] -> IO [CompleteReq u w]
forall {a}. STM a -> IO a
wrapped (TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions)
          when queueEmpty $ do
            -- Double check the queue as we want to make sure that
            -- submittedReqsRef is copied before waiting on the queue but as a
            -- fast path do not want to copy it if the queue is empty.
            -- There is still a race oppoortunity as submittedReqsRef is
            -- decremented in whatever thread the completion happens, and so it
            -- is possible for waitingOn to be empty while queueEmpty2 is True.
            waitingOn <- readIORef submittedReqsRef
            queueEmpty2 <- null <$> wrapped (readTVar completions)
            when queueEmpty2 $ do
              start <- getTimestamp
              doWait
              end <- getTimestamp
              let fw = FetchWait
                        { fetchWaitReqs :: HashMap Text Int
fetchWaitReqs = ReqCountMap -> HashMap Text Int
getSummaryMapFromRCMap ReqCountMap
waitingOn
                        , fetchWaitStart :: Timestamp
fetchWaitStart = Timestamp
start
                        , fetchWaitDuration :: Timestamp
fetchWaitDuration = (Timestamp
endTimestamp -> Timestamp -> Timestamp
forall a. Num a => a -> a -> a
-Timestamp
start)
                        }
              modifyIORef' statsRef $ \(Stats [FetchStats]
s) -> [FetchStats] -> Stats
Stats (FetchStats
fwFetchStats -> [FetchStats] -> [FetchStats]
forall a. a -> [a] -> [a]
:[FetchStats]
s)
      if ReportFlag -> ReportFlags -> Bool
testReportFlag ReportFlag
ReportFetchStats (ReportFlags -> Bool) -> ReportFlags -> Bool
forall a b. (a -> b) -> a -> b
$ Flags -> ReportFlags
report Flags
flags
        then IO ()
doWaitProfiled
        else IO ()
doWait
      Env u w -> IO ()
forall u w. Env u w -> IO ()
emptyRunQueue Env u w
env

  --
  schedule env JobNil haxl result
  r <- readIORef resultRef
  writeIORef writeLogsRef mempty
  wtNoMemo <- atomicModifyIORef' writeLogsRefNoMemo
    (\w
old_wrts -> (w
forall a. Monoid a => a
mempty, w
old_wrts))
  case r of
    IVarEmpty JobList u w
_ -> CriticalError -> IO (a, w)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Text -> CriticalError
CriticalError Text
"runHaxl: missing result")
    IVarFull (Ok a
a Maybe w
wt) -> do
      (a, w) -> IO (a, w)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, w -> Maybe w -> w
forall a. a -> Maybe a -> a
fromMaybe w
forall a. Monoid a => a
mempty Maybe w
wt w -> w -> w
forall a. Semigroup a => a -> a -> a
<> w
wtNoMemo)
    IVarFull (ThrowHaxl SomeException
e Maybe w
_wt)  -> SomeException -> IO (a, w)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
e
      -- The written logs are discarded when there's a Haxl exception. We
      -- can change this behavior if we need to get access to partial logs.
    IVarFull (ThrowIO SomeException
e)  -> SomeException -> IO (a, w)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
e


{- Note [runHaxl and unfinished requests]

runHaxl returns immediately when the supplied computation has returned
a result.  This doesn't necessarily mean that the whole computation
graph has completed, however.  In particular, when using pAnd and pOr,
we might have created some data fetches that have not completed, but
weren't required, because the other branch of the pAnd/pOr subsumed
the result.

When runHaxl returns, it might be that:
- reqStoreRef contains some unsubmitted requests
- runQueueRef contains some jobs
- there are in-flight BackgroundFetch requests, that will return their
  results to the completions queue in due course.
- there are various unfilled IVars in the cache and/or memo tables

This should be all safe, we can even restart runHaxl with the same Env
after it has stopped and the in-progress computations will
continue. But don't discard the contents of
reqStoreRef/runQueueRef/completions, because then we'll deadlock if we
discover one of the unfilled IVars in the cache or memo table.
-}

{- TODO: later
data SchedPolicy
  = SubmitImmediately
  | WaitAtLeast Int{-ms-}
  | WaitForAllPendingRequests
-}

-- | An exception thrown when reading from datasources fails
data ReadingCompletionsFailedRun = ReadingCompletionsFailedRun
  deriving Int -> ReadingCompletionsFailedRun -> String -> String
[ReadingCompletionsFailedRun] -> String -> String
ReadingCompletionsFailedRun -> String
(Int -> ReadingCompletionsFailedRun -> String -> String)
-> (ReadingCompletionsFailedRun -> String)
-> ([ReadingCompletionsFailedRun] -> String -> String)
-> Show ReadingCompletionsFailedRun
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
$cshowsPrec :: Int -> ReadingCompletionsFailedRun -> String -> String
showsPrec :: Int -> ReadingCompletionsFailedRun -> String -> String
$cshow :: ReadingCompletionsFailedRun -> String
show :: ReadingCompletionsFailedRun -> String
$cshowList :: [ReadingCompletionsFailedRun] -> String -> String
showList :: [ReadingCompletionsFailedRun] -> String -> String
Show

instance Exception ReadingCompletionsFailedRun