{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ViewPatterns #-}
module Database.Redis.Cluster
( Connection(..)
, NodeRole(..)
, NodeConnection(..)
, Node(..)
, ShardMap(..)
, HashSlot
, Shard(..)
, connect
, disconnect
, requestPipelined
, nodes
) where
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import qualified Data.IORef as IOR
import Data.List(nub, sortBy, find)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..))
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
import Control.Monad(zipWithM, when, replicateM)
import Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot)
import qualified Database.Redis.ConnectionContext as CC
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap.Strict as IntMap
import Data.Typeable
import qualified Scanner
import System.IO.Unsafe(unsafeInterleaveIO)
import Database.Redis.Protocol(Reply(Error), renderRequest, reply)
import qualified Database.Redis.Cluster.Command as CMD
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap
data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID
instance Eq NodeConnection where
(NodeConnection ConnectionContext
_ IORef (Maybe NodeID)
_ NodeID
id1) == :: NodeConnection -> NodeConnection -> Bool
== (NodeConnection ConnectionContext
_ IORef (Maybe NodeID)
_ NodeID
id2) = NodeID
id1 NodeID -> NodeID -> Bool
forall a. Eq a => a -> a -> Bool
== NodeID
id2
instance Ord NodeConnection where
compare :: NodeConnection -> NodeConnection -> Ordering
compare (NodeConnection ConnectionContext
_ IORef (Maybe NodeID)
_ NodeID
id1) (NodeConnection ConnectionContext
_ IORef (Maybe NodeID)
_ NodeID
id2) = NodeID -> NodeID -> Ordering
forall a. Ord a => a -> a -> Ordering
compare NodeID
id1 NodeID
id2
data PipelineState =
Pending [[B.ByteString]]
| Executed [Reply]
| TransactionPending [[B.ByteString]]
newtype Pipeline = Pipeline (MVar PipelineState)
data NodeRole = Master | Slave deriving (Port -> NodeRole -> ShowS
[NodeRole] -> ShowS
NodeRole -> Host
(Port -> NodeRole -> ShowS)
-> (NodeRole -> Host) -> ([NodeRole] -> ShowS) -> Show NodeRole
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> NodeRole -> ShowS
showsPrec :: Port -> NodeRole -> ShowS
$cshow :: NodeRole -> Host
show :: NodeRole -> Host
$cshowList :: [NodeRole] -> ShowS
showList :: [NodeRole] -> ShowS
Show, NodeRole -> NodeRole -> Bool
(NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool) -> Eq NodeRole
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NodeRole -> NodeRole -> Bool
== :: NodeRole -> NodeRole -> Bool
$c/= :: NodeRole -> NodeRole -> Bool
/= :: NodeRole -> NodeRole -> Bool
Eq, Eq NodeRole
Eq NodeRole =>
(NodeRole -> NodeRole -> Ordering)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> NodeRole)
-> (NodeRole -> NodeRole -> NodeRole)
-> Ord NodeRole
NodeRole -> NodeRole -> Bool
NodeRole -> NodeRole -> Ordering
NodeRole -> NodeRole -> NodeRole
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: NodeRole -> NodeRole -> Ordering
compare :: NodeRole -> NodeRole -> Ordering
$c< :: NodeRole -> NodeRole -> Bool
< :: NodeRole -> NodeRole -> Bool
$c<= :: NodeRole -> NodeRole -> Bool
<= :: NodeRole -> NodeRole -> Bool
$c> :: NodeRole -> NodeRole -> Bool
> :: NodeRole -> NodeRole -> Bool
$c>= :: NodeRole -> NodeRole -> Bool
>= :: NodeRole -> NodeRole -> Bool
$cmax :: NodeRole -> NodeRole -> NodeRole
max :: NodeRole -> NodeRole -> NodeRole
$cmin :: NodeRole -> NodeRole -> NodeRole
min :: NodeRole -> NodeRole -> NodeRole
Ord)
type Host = String
type Port = Int
type NodeID = B.ByteString
data Node = Node NodeID NodeRole Host Port deriving (Port -> Node -> ShowS
[Node] -> ShowS
Node -> Host
(Port -> Node -> ShowS)
-> (Node -> Host) -> ([Node] -> ShowS) -> Show Node
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> Node -> ShowS
showsPrec :: Port -> Node -> ShowS
$cshow :: Node -> Host
show :: Node -> Host
$cshowList :: [Node] -> ShowS
showList :: [Node] -> ShowS
Show, Node -> Node -> Bool
(Node -> Node -> Bool) -> (Node -> Node -> Bool) -> Eq Node
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Node -> Node -> Bool
== :: Node -> Node -> Bool
$c/= :: Node -> Node -> Bool
/= :: Node -> Node -> Bool
Eq, Eq Node
Eq Node =>
(Node -> Node -> Ordering)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Node)
-> (Node -> Node -> Node)
-> Ord Node
Node -> Node -> Bool
Node -> Node -> Ordering
Node -> Node -> Node
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Node -> Node -> Ordering
compare :: Node -> Node -> Ordering
$c< :: Node -> Node -> Bool
< :: Node -> Node -> Bool
$c<= :: Node -> Node -> Bool
<= :: Node -> Node -> Bool
$c> :: Node -> Node -> Bool
> :: Node -> Node -> Bool
$c>= :: Node -> Node -> Bool
>= :: Node -> Node -> Bool
$cmax :: Node -> Node -> Node
max :: Node -> Node -> Node
$cmin :: Node -> Node -> Node
min :: Node -> Node -> Node
Ord)
type MasterNode = Node
type SlaveNode = Node
data Shard = Shard MasterNode [SlaveNode] deriving (Port -> Shard -> ShowS
[Shard] -> ShowS
Shard -> Host
(Port -> Shard -> ShowS)
-> (Shard -> Host) -> ([Shard] -> ShowS) -> Show Shard
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> Shard -> ShowS
showsPrec :: Port -> Shard -> ShowS
$cshow :: Shard -> Host
show :: Shard -> Host
$cshowList :: [Shard] -> ShowS
showList :: [Shard] -> ShowS
Show, Shard -> Shard -> Bool
(Shard -> Shard -> Bool) -> (Shard -> Shard -> Bool) -> Eq Shard
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Shard -> Shard -> Bool
== :: Shard -> Shard -> Bool
$c/= :: Shard -> Shard -> Bool
/= :: Shard -> Shard -> Bool
Eq, Eq Shard
Eq Shard =>
(Shard -> Shard -> Ordering)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Shard)
-> (Shard -> Shard -> Shard)
-> Ord Shard
Shard -> Shard -> Bool
Shard -> Shard -> Ordering
Shard -> Shard -> Shard
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Shard -> Shard -> Ordering
compare :: Shard -> Shard -> Ordering
$c< :: Shard -> Shard -> Bool
< :: Shard -> Shard -> Bool
$c<= :: Shard -> Shard -> Bool
<= :: Shard -> Shard -> Bool
$c> :: Shard -> Shard -> Bool
> :: Shard -> Shard -> Bool
$c>= :: Shard -> Shard -> Bool
>= :: Shard -> Shard -> Bool
$cmax :: Shard -> Shard -> Shard
max :: Shard -> Shard -> Shard
$cmin :: Shard -> Shard -> Shard
min :: Shard -> Shard -> Shard
Ord)
newtype ShardMap = ShardMap (IntMap.IntMap Shard) deriving (Port -> ShardMap -> ShowS
[ShardMap] -> ShowS
ShardMap -> Host
(Port -> ShardMap -> ShowS)
-> (ShardMap -> Host) -> ([ShardMap] -> ShowS) -> Show ShardMap
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> ShardMap -> ShowS
showsPrec :: Port -> ShardMap -> ShowS
$cshow :: ShardMap -> Host
show :: ShardMap -> Host
$cshowList :: [ShardMap] -> ShowS
showList :: [ShardMap] -> ShowS
Show)
newtype MissingNodeException = MissingNodeException [B.ByteString] deriving (Port -> MissingNodeException -> ShowS
[MissingNodeException] -> ShowS
MissingNodeException -> Host
(Port -> MissingNodeException -> ShowS)
-> (MissingNodeException -> Host)
-> ([MissingNodeException] -> ShowS)
-> Show MissingNodeException
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> MissingNodeException -> ShowS
showsPrec :: Port -> MissingNodeException -> ShowS
$cshow :: MissingNodeException -> Host
show :: MissingNodeException -> Host
$cshowList :: [MissingNodeException] -> ShowS
showList :: [MissingNodeException] -> ShowS
Show, Typeable)
instance Exception MissingNodeException
newtype UnsupportedClusterCommandException = UnsupportedClusterCommandException [B.ByteString] deriving (Port -> UnsupportedClusterCommandException -> ShowS
[UnsupportedClusterCommandException] -> ShowS
UnsupportedClusterCommandException -> Host
(Port -> UnsupportedClusterCommandException -> ShowS)
-> (UnsupportedClusterCommandException -> Host)
-> ([UnsupportedClusterCommandException] -> ShowS)
-> Show UnsupportedClusterCommandException
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> UnsupportedClusterCommandException -> ShowS
showsPrec :: Port -> UnsupportedClusterCommandException -> ShowS
$cshow :: UnsupportedClusterCommandException -> Host
show :: UnsupportedClusterCommandException -> Host
$cshowList :: [UnsupportedClusterCommandException] -> ShowS
showList :: [UnsupportedClusterCommandException] -> ShowS
Show, Typeable)
instance Exception UnsupportedClusterCommandException
newtype CrossSlotException = CrossSlotException [[B.ByteString]] deriving (Port -> CrossSlotException -> ShowS
[CrossSlotException] -> ShowS
CrossSlotException -> Host
(Port -> CrossSlotException -> ShowS)
-> (CrossSlotException -> Host)
-> ([CrossSlotException] -> ShowS)
-> Show CrossSlotException
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Port -> CrossSlotException -> ShowS
showsPrec :: Port -> CrossSlotException -> ShowS
$cshow :: CrossSlotException -> Host
show :: CrossSlotException -> Host
$cshowList :: [CrossSlotException] -> ShowS
showList :: [CrossSlotException] -> ShowS
Show, Typeable)
instance Exception CrossSlotException
connect :: [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> IO Connection
connect :: [CommandInfo] -> MVar ShardMap -> Maybe Port -> IO Connection
connect [CommandInfo]
commandInfos MVar ShardMap
shardMapVar Maybe Port
timeoutOpt = do
shardMap <- MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
stateVar <- newMVar $ Pending []
pipelineVar <- newMVar $ Pipeline stateVar
nodeConns <- nodeConnections shardMap
return $ Connection nodeConns pipelineVar shardMapVar (CMD.newInfoMap commandInfos) where
nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection)
nodeConnections :: ShardMap -> IO (HashMap NodeID NodeConnection)
nodeConnections ShardMap
shardMap = [(NodeID, NodeConnection)] -> HashMap NodeID NodeConnection
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList ([(NodeID, NodeConnection)] -> HashMap NodeID NodeConnection)
-> IO [(NodeID, NodeConnection)]
-> IO (HashMap NodeID NodeConnection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Node -> IO (NodeID, NodeConnection))
-> [Node] -> IO [(NodeID, NodeConnection)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM Node -> IO (NodeID, NodeConnection)
connectNode ([Node] -> [Node]
forall a. Eq a => [a] -> [a]
nub ([Node] -> [Node]) -> [Node] -> [Node]
forall a b. (a -> b) -> a -> b
$ ShardMap -> [Node]
nodes ShardMap
shardMap)
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode (Node NodeID
n NodeRole
_ Host
host Port
port) = do
ctx <- Host -> PortID -> Maybe Port -> IO ConnectionContext
CC.connect Host
host (PortNumber -> PortID
CC.PortNumber (PortNumber -> PortID) -> PortNumber -> PortID
forall a b. (a -> b) -> a -> b
$ Port -> PortNumber
forall a. Enum a => Port -> a
toEnum Port
port) Maybe Port
timeoutOpt
ref <- IOR.newIORef Nothing
return (n, NodeConnection ctx ref n)
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect (Connection HashMap NodeID NodeConnection
nodeConnMap MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = (NodeConnection -> IO ()) -> [NodeConnection] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ NodeConnection -> IO ()
disconnectNode (HashMap NodeID NodeConnection -> [NodeConnection]
forall k v. HashMap k v -> [v]
HM.elems HashMap NodeID NodeConnection
nodeConnMap) where
disconnectNode :: NodeConnection -> IO ()
disconnectNode (NodeConnection ConnectionContext
nodeCtx IORef (Maybe NodeID)
_ NodeID
_) = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
nodeCtx
requestPipelined :: IO ShardMap -> Connection -> [B.ByteString] -> IO Reply
requestPipelined :: IO ShardMap -> Connection -> [NodeID] -> IO Reply
requestPipelined IO ShardMap
refreshAction conn :: Connection
conn@(Connection HashMap NodeID NodeConnection
_ MVar Pipeline
pipelineVar MVar ShardMap
shardMapVar InfoMap
_) [NodeID]
nextRequest = MVar Pipeline -> (Pipeline -> IO (Pipeline, Reply)) -> IO Reply
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Pipeline
pipelineVar ((Pipeline -> IO (Pipeline, Reply)) -> IO Reply)
-> (Pipeline -> IO (Pipeline, Reply)) -> IO Reply
forall a b. (a -> b) -> a -> b
$ \(Pipeline MVar PipelineState
stateVar) -> do
(newStateVar, repliesIndex) <- IO (MVar PipelineState, Port) -> IO (MVar PipelineState, Port)
forall a. IO a -> IO a
hasLocked (IO (MVar PipelineState, Port) -> IO (MVar PipelineState, Port))
-> IO (MVar PipelineState, Port) -> IO (MVar PipelineState, Port)
forall a b. (a -> b) -> a -> b
$ MVar PipelineState
-> (PipelineState
-> IO (PipelineState, (MVar PipelineState, Port)))
-> IO (MVar PipelineState, Port)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar PipelineState
stateVar ((PipelineState -> IO (PipelineState, (MVar PipelineState, Port)))
-> IO (MVar PipelineState, Port))
-> (PipelineState
-> IO (PipelineState, (MVar PipelineState, Port)))
-> IO (MVar PipelineState, Port)
forall a b. (a -> b) -> a -> b
$ \case
Pending [[NodeID]]
requests | [NodeID] -> Bool
isMulti [NodeID]
nextRequest -> do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
s' <- newMVar $ TransactionPending [nextRequest]
return (Executed replies, (s', 0))
Pending [[NodeID]]
requests | [[NodeID]] -> Port
forall a. [a] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
> Port
1000 -> do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn ([NodeID]
nextRequest[NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
:[[NodeID]]
requests)
return (Executed replies, (stateVar, length requests))
Pending [[NodeID]]
requests ->
(PipelineState, (MVar PipelineState, Port))
-> IO (PipelineState, (MVar PipelineState, Port))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([[NodeID]] -> PipelineState
Pending ([NodeID]
nextRequest[NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
:[[NodeID]]
requests), (MVar PipelineState
stateVar, [[NodeID]] -> Port
forall a. [a] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests))
TransactionPending [[NodeID]]
requests ->
if [NodeID] -> Bool
isExec [NodeID]
nextRequest then do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn ([NodeID]
nextRequest[NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
:[[NodeID]]
requests)
return (Executed replies, (stateVar, length requests))
else
(PipelineState, (MVar PipelineState, Port))
-> IO (PipelineState, (MVar PipelineState, Port))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([[NodeID]] -> PipelineState
TransactionPending ([NodeID]
nextRequest[NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
:[[NodeID]]
requests), (MVar PipelineState
stateVar, [[NodeID]] -> Port
forall a. [a] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests))
e :: PipelineState
e@(Executed [Reply]
_) -> do
s' <- PipelineState -> IO (MVar PipelineState)
forall a. a -> IO (MVar a)
newMVar (PipelineState -> IO (MVar PipelineState))
-> PipelineState -> IO (MVar PipelineState)
forall a b. (a -> b) -> a -> b
$
if [NodeID] -> Bool
isMulti [NodeID]
nextRequest then
[[NodeID]] -> PipelineState
TransactionPending [[NodeID]
nextRequest]
else
[[NodeID]] -> PipelineState
Pending [[NodeID]
nextRequest]
return (e, (s', 0))
evaluateAction <- unsafeInterleaveIO $ do
replies <- hasLocked $ modifyMVar newStateVar $ \case
Executed [Reply]
replies ->
(PipelineState, [Reply]) -> IO (PipelineState, [Reply])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, [Reply]
replies)
Pending [[NodeID]]
requests-> do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
return (Executed replies, replies)
TransactionPending [[NodeID]]
requests-> do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
return (Executed replies, replies)
return $ replies !! repliesIndex
return (Pipeline newStateVar, evaluateAction)
isMulti :: [B.ByteString] -> Bool
isMulti :: [NodeID] -> Bool
isMulti (NodeID
"MULTI" : [NodeID]
_) = Bool
True
isMulti [NodeID]
_ = Bool
False
isExec :: [B.ByteString] -> Bool
isExec :: [NodeID] -> Bool
isExec (NodeID
"EXEC" : [NodeID]
_) = Bool
True
isExec [NodeID]
_ = Bool
False
data PendingRequest = PendingRequest Int [B.ByteString]
data CompletedRequest = CompletedRequest Int [B.ByteString] Reply
rawRequest :: PendingRequest -> [B.ByteString]
rawRequest :: PendingRequest -> [NodeID]
rawRequest (PendingRequest Port
_ [NodeID]
r) = [NodeID]
r
responseIndex :: CompletedRequest -> Int
responseIndex :: CompletedRequest -> Port
responseIndex (CompletedRequest Port
i [NodeID]
_ Reply
_) = Port
i
rawResponse :: CompletedRequest -> Reply
rawResponse :: CompletedRequest -> Reply
rawResponse (CompletedRequest Port
_ [NodeID]
_ Reply
r) = Reply
r
evaluatePipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluatePipeline :: MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn [[NodeID]]
requests = do
shardMap <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
requestsByNode <- getRequestsByNode shardMap
resps <- concat <$> mapM (uncurry executeRequests) requestsByNode
when (any (moved . rawResponse) resps) refreshShardMapVar
retriedResps <- mapM (retry 0) resps
return $ map rawResponse $ sortBy (on compare responseIndex) retriedResps
where
getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode ShardMap
shardMap = do
commandsWithNodes <- (Port -> [NodeID] -> IO [(NodeConnection, [PendingRequest])])
-> [Port]
-> [[NodeID]]
-> IO [[(NodeConnection, [PendingRequest])]]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (ShardMap
-> Port -> [NodeID] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes ShardMap
shardMap) ([Port] -> [Port]
forall a. [a] -> [a]
reverse [Port
0..([[NodeID]] -> Port
forall a. [a] -> Port
forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests Port -> Port -> Port
forall a. Num a => a -> a -> a
- Port
1)]) [[NodeID]]
requests
return $ assocs $ fromListWith (++) (mconcat commandsWithNodes)
requestWithNodes :: ShardMap -> Int -> [B.ByteString] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes :: ShardMap
-> Port -> [NodeID] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes ShardMap
shardMap Port
index [NodeID]
request = do
nodeConns <- Connection -> ShardMap -> [NodeID] -> IO [NodeConnection]
nodeConnectionForCommand Connection
conn ShardMap
shardMap [NodeID]
request
return $ (, [PendingRequest index request]) <$> nodeConns
executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests NodeConnection
nodeConn [PendingRequest]
nodeRequests = do
replies <- NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode NodeConnection
nodeConn ([[NodeID]] -> IO [Reply]) -> [[NodeID]] -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ (PendingRequest -> [NodeID]) -> [PendingRequest] -> [[NodeID]]
forall a b. (a -> b) -> [a] -> [b]
map PendingRequest -> [NodeID]
rawRequest [PendingRequest]
nodeRequests
return $ zipWith (curry (\(PendingRequest Port
i [NodeID]
r, Reply
rep) -> Port -> [NodeID] -> Reply -> CompletedRequest
CompletedRequest Port
i [NodeID]
r Reply
rep)) nodeRequests replies
retry :: Int -> CompletedRequest -> IO CompletedRequest
retry :: Port -> CompletedRequest -> IO CompletedRequest
retry Port
retryCount (CompletedRequest Port
index [NodeID]
request Reply
thisReply) = do
retryReply <- [Reply] -> Reply
forall a. HasCallStack => [a] -> a
head ([Reply] -> Reply) -> IO [Reply] -> IO Reply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar ShardMap
-> IO ShardMap
-> Connection
-> Port
-> [[NodeID]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Port
retryCount [[NodeID]
request] [Reply
thisReply]
return (CompletedRequest index request retryReply)
refreshShardMapVar :: IO ()
refreshShardMapVar :: IO ()
refreshShardMapVar = IO () -> IO ()
forall a. IO a -> IO a
hasLocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> (ShardMap -> IO ShardMap) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (IO ShardMap -> ShardMap -> IO ShardMap
forall a b. a -> b -> a
const IO ShardMap
refreshShardmapAction)
retryBatch :: MVar ShardMap -> IO ShardMap -> Connection -> Int -> [[B.ByteString]] -> [Reply] -> IO [Reply]
retryBatch :: MVar ShardMap
-> IO ShardMap
-> Connection
-> Port
-> [[NodeID]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Port
retryCount [[NodeID]]
requests [Reply]
replies =
case [Reply] -> Reply
forall a. HasCallStack => [a] -> a
last [Reply]
replies of
(Error NodeID
errString) | NodeID -> NodeID -> Bool
B.isPrefixOf NodeID
"MOVED" NodeID
errString -> do
let (Connection HashMap NodeID NodeConnection
_ MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) = Connection
conn
keys <- [[NodeID]] -> [NodeID]
forall a. Monoid a => [a] -> a
mconcat ([[NodeID]] -> [NodeID]) -> IO [[NodeID]] -> IO [NodeID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([NodeID] -> IO [NodeID]) -> [[NodeID]] -> IO [[NodeID]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap) [[NodeID]]
requests
hashSlot <- hashSlotForKeys (CrossSlotException requests) keys
nodeConn <- nodeConnForHashSlot shardMapVar conn (MissingNodeException (head requests)) hashSlot
requestNode nodeConn requests
(Reply -> Maybe (Host, Port)
askingRedirection -> Just (Host
host, Port
port)) -> do
shardMap <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
let maybeAskNode = ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort ShardMap
shardMap Connection
conn Host
host Port
port
case maybeAskNode of
Just NodeConnection
askNode -> [Reply] -> [Reply]
forall a. HasCallStack => [a] -> [a]
tail ([Reply] -> [Reply]) -> IO [Reply] -> IO [Reply]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode NodeConnection
askNode ([NodeID
"ASKING"] [NodeID] -> [[NodeID]] -> [[NodeID]]
forall a. a -> [a] -> [a]
: [[NodeID]]
requests)
Maybe NodeConnection
Nothing -> case Port
retryCount of
Port
0 -> do
_ <- IO () -> IO ()
forall a. IO a -> IO a
hasLocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> (ShardMap -> IO ShardMap) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (IO ShardMap -> ShardMap -> IO ShardMap
forall a b. a -> b -> a
const IO ShardMap
refreshShardmapAction)
retryBatch shardMapVar refreshShardmapAction conn (retryCount + 1) requests replies
Port
_ -> MissingNodeException -> IO [Reply]
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (MissingNodeException -> IO [Reply])
-> MissingNodeException -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException ([[NodeID]] -> [NodeID]
forall a. HasCallStack => [a] -> a
head [[NodeID]]
requests)
Reply
_ -> [Reply] -> IO [Reply]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [Reply]
replies
evaluateTransactionPipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluateTransactionPipeline :: MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn [[NodeID]]
requests' = do
let requests :: [[NodeID]]
requests = [[NodeID]] -> [[NodeID]]
forall a. [a] -> [a]
reverse [[NodeID]]
requests'
let (Connection HashMap NodeID NodeConnection
_ MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) = Connection
conn
keys <- [[NodeID]] -> [NodeID]
forall a. Monoid a => [a] -> a
mconcat ([[NodeID]] -> [NodeID]) -> IO [[NodeID]] -> IO [NodeID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([NodeID] -> IO [NodeID]) -> [[NodeID]] -> IO [[NodeID]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap) [[NodeID]]
requests
hashSlot <- hashSlotForKeys (CrossSlotException requests) keys
nodeConn <- nodeConnForHashSlot shardMapVar conn (MissingNodeException (head requests)) hashSlot
resps <- requestNode nodeConn requests
when (any moved resps)
(hasLocked $ modifyMVar_ shardMapVar (const refreshShardmapAction))
retriedResps <- retryBatch shardMapVar refreshShardmapAction conn 0 requests resps
return retriedResps
nodeConnForHashSlot :: Exception e => MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot :: forall e.
Exception e =>
MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot MVar ShardMap
shardMapVar Connection
conn e
exception HashSlot
hashSlot = do
let (Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = Connection
conn
(ShardMap shardMap) <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
node <-
case IntMap.lookup (fromEnum hashSlot) shardMap of
Maybe Shard
Nothing -> e -> IO Node
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO e
exception
Just (Shard Node
master [Node]
_) -> Node -> IO Node
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Node
master
case HM.lookup (nodeId node) nodeConns of
Maybe NodeConnection
Nothing -> e -> IO NodeConnection
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO e
exception
Just NodeConnection
nodeConn' -> NodeConnection -> IO NodeConnection
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return NodeConnection
nodeConn'
hashSlotForKeys :: Exception e => e -> [B.ByteString] -> IO HashSlot
hashSlotForKeys :: forall e. Exception e => e -> [NodeID] -> IO HashSlot
hashSlotForKeys e
exception [NodeID]
keys =
case [HashSlot] -> [HashSlot]
forall a. Eq a => [a] -> [a]
nub (NodeID -> HashSlot
keyToSlot (NodeID -> HashSlot) -> [NodeID] -> [HashSlot]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [NodeID]
keys) of
[] -> HashSlot -> IO HashSlot
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return HashSlot
0
[HashSlot
hashSlot] -> HashSlot -> IO HashSlot
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return HashSlot
hashSlot
[HashSlot]
_ -> e -> IO HashSlot
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (e -> IO HashSlot) -> e -> IO HashSlot
forall a b. (a -> b) -> a -> b
$ e
exception
requestKeys :: CMD.InfoMap -> [B.ByteString] -> IO [B.ByteString]
requestKeys :: InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap [NodeID]
request =
case InfoMap -> [NodeID] -> Maybe [NodeID]
CMD.keysForRequest InfoMap
infoMap [NodeID]
request of
Maybe [NodeID]
Nothing -> UnsupportedClusterCommandException -> IO [NodeID]
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (UnsupportedClusterCommandException -> IO [NodeID])
-> UnsupportedClusterCommandException -> IO [NodeID]
forall a b. (a -> b) -> a -> b
$ [NodeID] -> UnsupportedClusterCommandException
UnsupportedClusterCommandException [NodeID]
request
Just [NodeID]
k -> [NodeID] -> IO [NodeID]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [NodeID]
k
askingRedirection :: Reply -> Maybe (Host, Port)
askingRedirection :: Reply -> Maybe (Host, Port)
askingRedirection (Error NodeID
errString) = case NodeID -> [NodeID]
Char8.words NodeID
errString of
[NodeID
"ASK", NodeID
_, NodeID
hostport] -> case Char -> NodeID -> [NodeID]
Char8.split Char
':' NodeID
hostport of
[NodeID
host, NodeID
portString] -> case NodeID -> Maybe (Port, NodeID)
Char8.readInt NodeID
portString of
Just (Port
port,NodeID
"") -> (Host, Port) -> Maybe (Host, Port)
forall a. a -> Maybe a
Just (NodeID -> Host
Char8.unpack NodeID
host, Port
port)
Maybe (Port, NodeID)
_ -> Maybe (Host, Port)
forall a. Maybe a
Nothing
[NodeID]
_ -> Maybe (Host, Port)
forall a. Maybe a
Nothing
[NodeID]
_ -> Maybe (Host, Port)
forall a. Maybe a
Nothing
askingRedirection Reply
_ = Maybe (Host, Port)
forall a. Maybe a
Nothing
moved :: Reply -> Bool
moved :: Reply -> Bool
moved (Error NodeID
errString) = case NodeID -> [NodeID]
Char8.words NodeID
errString of
NodeID
"MOVED":[NodeID]
_ -> Bool
True
[NodeID]
_ -> Bool
False
moved Reply
_ = Bool
False
nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort ShardMap
shardMap (Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) Host
host Port
port = do
node <- ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort ShardMap
shardMap Host
host Port
port
HM.lookup (nodeId node) nodeConns
nodeConnectionForCommand :: Connection -> ShardMap -> [B.ByteString] -> IO [NodeConnection]
nodeConnectionForCommand :: Connection -> ShardMap -> [NodeID] -> IO [NodeConnection]
nodeConnectionForCommand conn :: Connection
conn@(Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) (ShardMap IntMap Shard
shardMap) [NodeID]
request =
case [NodeID]
request of
(NodeID
"FLUSHALL" : [NodeID]
_) -> IO [NodeConnection]
allNodes
(NodeID
"FLUSHDB" : [NodeID]
_) -> IO [NodeConnection]
allNodes
(NodeID
"QUIT" : [NodeID]
_) -> IO [NodeConnection]
allNodes
(NodeID
"UNWATCH" : [NodeID]
_) -> IO [NodeConnection]
allNodes
[NodeID]
_ -> do
keys <- InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap [NodeID]
request
hashSlot <- hashSlotForKeys (CrossSlotException [request]) keys
node <- case IntMap.lookup (fromEnum hashSlot) shardMap of
Maybe Shard
Nothing -> MissingNodeException -> IO Node
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (MissingNodeException -> IO Node)
-> MissingNodeException -> IO Node
forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException [NodeID]
request
Just (Shard Node
master [Node]
_) -> Node -> IO Node
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Node
master
maybe (throwIO $ MissingNodeException request) (return . return) (HM.lookup (nodeId node) nodeConns)
where
allNodes :: IO [NodeConnection]
allNodes =
case Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes Connection
conn (IntMap Shard -> ShardMap
ShardMap IntMap Shard
shardMap) of
Maybe [NodeConnection]
Nothing -> MissingNodeException -> IO [NodeConnection]
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (MissingNodeException -> IO [NodeConnection])
-> MissingNodeException -> IO [NodeConnection]
forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException [NodeID]
request
Just [NodeConnection]
allNodes' -> [NodeConnection] -> IO [NodeConnection]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [NodeConnection]
allNodes'
allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes (Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) (ShardMap IntMap Shard
shardMap) =
(Node -> Maybe NodeConnection) -> [Node] -> Maybe [NodeConnection]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ((NodeID -> HashMap NodeID NodeConnection -> Maybe NodeConnection)
-> HashMap NodeID NodeConnection -> NodeID -> Maybe NodeConnection
forall a b c. (a -> b -> c) -> b -> a -> c
flip NodeID -> HashMap NodeID NodeConnection -> Maybe NodeConnection
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup HashMap NodeID NodeConnection
nodeConns (NodeID -> Maybe NodeConnection)
-> (Node -> NodeID) -> Node -> Maybe NodeConnection
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Node -> NodeID
nodeId) [Node]
masterNodes
where
masterNodes :: [Node]
masterNodes = (\(Shard Node
master [Node]
_) -> Node
master) (Shard -> Node) -> [Shard] -> [Node]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Shard] -> [Shard]
forall a. Eq a => [a] -> [a]
nub (IntMap Shard -> [Shard]
forall a. IntMap a -> [a]
IntMap.elems IntMap Shard
shardMap)
requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode :: NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode (NodeConnection ConnectionContext
ctx IORef (Maybe NodeID)
lastRecvRef NodeID
_) [[NodeID]]
requests = do
([NodeID] -> IO ()) -> [[NodeID]] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (NodeID -> IO ()
sendNode (NodeID -> IO ()) -> ([NodeID] -> NodeID) -> [NodeID] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [NodeID] -> NodeID
renderRequest) [[NodeID]]
requests
_ <- ConnectionContext -> IO ()
CC.flush ConnectionContext
ctx
replicateM (length requests) recvNode
where
sendNode :: B.ByteString -> IO ()
sendNode :: NodeID -> IO ()
sendNode = ConnectionContext -> NodeID -> IO ()
CC.send ConnectionContext
ctx
recvNode :: IO Reply
recvNode :: IO Reply
recvNode = do
maybeLastRecv <- IORef (Maybe NodeID) -> IO (Maybe NodeID)
forall a. IORef a -> IO a
IOR.readIORef IORef (Maybe NodeID)
lastRecvRef
scanResult <- case maybeLastRecv of
Just NodeID
lastRecv -> IO NodeID -> Scanner Reply -> NodeID -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m NodeID -> Scanner a -> NodeID -> m (Result a)
Scanner.scanWith (ConnectionContext -> IO NodeID
CC.recv ConnectionContext
ctx) Scanner Reply
reply NodeID
lastRecv
Maybe NodeID
Nothing -> IO NodeID -> Scanner Reply -> NodeID -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m NodeID -> Scanner a -> NodeID -> m (Result a)
Scanner.scanWith (ConnectionContext -> IO NodeID
CC.recv ConnectionContext
ctx) Scanner Reply
reply NodeID
B.empty
case scanResult of
Scanner.Fail{} -> IO Reply
forall a. IO a
CC.errConnClosed
Scanner.More{} -> Host -> IO Reply
forall a. HasCallStack => Host -> a
error Host
"Hedis: parseWith returned Partial"
Scanner.Done NodeID
rest' Reply
r -> do
IORef (Maybe NodeID) -> Maybe NodeID -> IO ()
forall a. IORef a -> a -> IO ()
IOR.writeIORef IORef (Maybe NodeID)
lastRecvRef (NodeID -> Maybe NodeID
forall a. a -> Maybe a
Just NodeID
rest')
Reply -> IO Reply
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Reply
r
nodes :: ShardMap -> [Node]
nodes :: ShardMap -> [Node]
nodes (ShardMap IntMap Shard
shardMap) = ((Port, [Node]) -> [Node]) -> [(Port, [Node])] -> [Node]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (Port, [Node]) -> [Node]
forall a b. (a, b) -> b
snd ([(Port, [Node])] -> [Node]) -> [(Port, [Node])] -> [Node]
forall a b. (a -> b) -> a -> b
$ IntMap [Node] -> [(Port, [Node])]
forall a. IntMap a -> [(Port, a)]
IntMap.toList (IntMap [Node] -> [(Port, [Node])])
-> IntMap [Node] -> [(Port, [Node])]
forall a b. (a -> b) -> a -> b
$ (Shard -> [Node]) -> IntMap Shard -> IntMap [Node]
forall a b. (a -> b) -> IntMap a -> IntMap b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Shard -> [Node]
shardNodes IntMap Shard
shardMap where
shardNodes :: Shard -> [Node]
shardNodes :: Shard -> [Node]
shardNodes (Shard Node
master [Node]
slaves) = Node
masterNode -> [Node] -> [Node]
forall a. a -> [a] -> [a]
:[Node]
slaves
nodeWithHostAndPort :: ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort :: ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort ShardMap
shardMap Host
host Port
port = (Node -> Bool) -> [Node] -> Maybe Node
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\(Node NodeID
_ NodeRole
_ Host
nodeHost Port
nodePort) -> Port
port Port -> Port -> Bool
forall a. Eq a => a -> a -> Bool
== Port
nodePort Bool -> Bool -> Bool
&& Host
host Host -> Host -> Bool
forall a. Eq a => a -> a -> Bool
== Host
nodeHost) (ShardMap -> [Node]
nodes ShardMap
shardMap)
nodeId :: Node -> NodeID
nodeId :: Node -> NodeID
nodeId (Node NodeID
theId NodeRole
_ Host
_ Port
_) = NodeID
theId
hasLocked :: IO a -> IO a
hasLocked :: forall a. IO a -> IO a
hasLocked IO a
action =
IO a
action IO a -> [Handler a] -> IO a
forall a. IO a -> [Handler a] -> IO a
`catches`
[ (BlockedIndefinitelyOnMVar -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler ((BlockedIndefinitelyOnMVar -> IO a) -> Handler a)
-> (BlockedIndefinitelyOnMVar -> IO a) -> Handler a
forall a b. (a -> b) -> a -> b
$ \exc :: BlockedIndefinitelyOnMVar
exc@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> BlockedIndefinitelyOnMVar -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO BlockedIndefinitelyOnMVar
exc
]