| Copyright | (c) 2021 Juan Pablo Royo Sales | 
|---|---|
| License | BSD3 | 
| Maintainer | juanpablo.royo@gmail.com | 
| Stability | experimental | 
| Portability | GHC | 
| Safe Haskell | None | 
| Language | Haskell2010 | 
DynamicPipeline
Description
DynamicPipeline is a Type Safe Dynamic and Parallel Streaming Library, which is an implementation of Dynamic Pipeline Paradigm (DPP) proposed in this paper DPP. The aim of this Library is to provide all the Type level constructs to guide the user in building a DPP flow to solve any algorithm that fits on this computational model.
This implementation has been developed using Type Level Programming techniques like Type families, Defunctionalization, Existential Types and Dynamic Record Tagged Types among others. Using all this techniques, we provide a High Level and Type Safe DynamicPipeline Library to build a Data Flow Algorithm avoiding as much as possible boilerplate code, but maintaining safety and robustness.
Example of Filtering Repeated elements of a Stream
import DynamicPipeline type DPExample =Source(Channel(Int:<+>Eof)):=>Generator(Channel(Int:<+>Eof)):=>Sinksource' ::Stage(WriteChannelInt ->DPs ()) source' =withSourceDPExample $ cout ->DPExample genAction inunfoldT([1 .. 1000] <> [1 .. 1000]) cout identity generator' ::GeneratorStageDPExample (Maybe Int) Int s generator' = let gen =withGeneratormkGeneratorgen filterTemp genAction ::FilterDPExample (Maybe Int) Int s ->ReadChannelInt ->WriteChannelInt ->DPs () genAction filter' cin cout = let unfoldFilter =mkUnfoldFilterForAll'(`push` cout) filter' Just cinHNilin void $unfoldFunfoldFilter filterTemp ::FilterDPExample (Maybe Int) Int s filterTemp =mkFilteractorRepeted actorRepeted :: Int ->ReadChannelInt ->WriteChannelInt -> StateT (Maybe Int) (DPs) () actorRepeted i rc wc = do liftIO $foldMrc $ e -> if e /= i thenpushe wc else pure () sink' ::Stage(ReadChannelInt ->DPs ()) sink' =withSinkDPExample $ flipDPExample source' generator' sink'foldMprint program :: IO () program =runDP$mkDP
Synopsis
- data Eof
 - data Sink
 - data Generator (a :: Type)
 - data Source (a :: Type)
 - data Channel (a :: Type)
 - data FeedbackChannel (a :: Type)
 - data a :=> b = a :=> b
 - data chann1 :<+> chann2 = chann1 :<+> chann2
 - data DynamicPipeline dpDefinition filterState filterParam st
 - data Filter dpDefinition filterState filterParam st
 - data Actor dpDefinition filterState filterParam monadicAction
 - data GeneratorStage dpDefinition filterState filterParam st
 - data Stage a
 - type family ValidDP (a :: Bool) :: Constraint where ...
 - type family IsDP (dpDefinition :: k) :: Bool where ...
 - data DP st a
 - data UnFoldFilter dpDefinition readElem st filterState filterParam l
 - withDP :: IO a -> DP s a
 - mkGenerator :: Stage (WithGenerator dpDefinition (Filter dpDefinition filterState filterParam st) (DP st)) -> Filter dpDefinition filterState filterParam st -> GeneratorStage dpDefinition filterState filterParam st
 - mkFilter :: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st)) -> Filter dpDefinition filterState filterParam st
 - single :: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st)) -> NonEmpty (Actor dpDefinition filterState filterParam (StateT filterState (DP st)))
 - actor :: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st)) -> Actor dpDefinition filterState filterParam (StateT filterState (DP st))
 - (|>>>) :: forall dpDefinition filterState filterParam st. Actor dpDefinition filterState filterParam (StateT filterState (DP st)) -> Filter dpDefinition filterState filterParam st -> Filter dpDefinition filterState filterParam st
 - (|>>) :: forall dpDefinition filterState filterParam st. Actor dpDefinition filterState filterParam (StateT filterState (DP st)) -> Actor dpDefinition filterState filterParam (StateT filterState (DP st)) -> Filter dpDefinition filterState filterParam st
 - withSource :: forall (dpDefinition :: Type) st. WithSource dpDefinition (DP st) -> Stage (WithSource dpDefinition (DP st))
 - withGenerator :: forall (dpDefinition :: Type) (filter :: Type) st. WithGenerator dpDefinition filter (DP st) -> Stage (WithGenerator dpDefinition filter (DP st))
 - withSink :: forall (dpDefinition :: Type) st. WithSink dpDefinition (DP st) -> Stage (WithSink dpDefinition (DP st))
 - mkDP :: forall dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi. DPConstraint dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi => Stage (WithSource dpDefinition (DP st)) -> GeneratorStage dpDefinition filterState filterParam st -> Stage (WithSink dpDefinition (DP st)) -> DP st ()
 - runDP :: (forall st. DP st a) -> IO a
 - unfoldF :: forall dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4. SpawnFilterConstraint dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4 => UnFoldFilter dpDefinition readElem st filterState filterParam l -> DP st (HList l)
 - mkUnfoldFilter :: (readElem -> Bool) -> (readElem -> DP st ()) -> Filter dpDefinition filterState filterParam st -> (readElem -> filterState) -> ReadChannel readElem -> HList l -> UnFoldFilter dpDefinition readElem st filterState filterParam l
 - mkUnfoldFilter' :: (readElem -> Bool) -> Filter dpDefinition filterState filterParam st -> (readElem -> filterState) -> ReadChannel readElem -> HList l -> UnFoldFilter dpDefinition readElem st filterState filterParam l
 - mkUnfoldFilterForAll :: Filter dpDefinition filterState filterParam st -> (readElem -> filterState) -> ReadChannel readElem -> HList l -> UnFoldFilter dpDefinition readElem st filterState filterParam l
 - mkUnfoldFilterForAll' :: (readElem -> DP st ()) -> Filter dpDefinition filterState filterParam st -> (readElem -> filterState) -> ReadChannel readElem -> HList l -> UnFoldFilter dpDefinition readElem st filterState filterParam l
 - (.*.) :: HExtend e l => e -> l -> HExtendR e l
 - data family HList (l :: [Type])
 - hHead :: forall e (l :: [Type]). HList (e ': l) -> e
 - hTail :: forall e (l :: [Type]). HList (e ': l) -> HList l
 - data ReadChannel a
 - data WriteChannel a
 - (|=>) :: MonadIO m => ReadChannel a -> WriteChannel a -> m ()
 - (|=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
 - (|>=>) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
 - (|>=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
 - mapF_ :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
 - map_ :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m ()
 - mapM_ :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
 - mapMF_ :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m ()
 - foldM_ :: MonadIO m => ReadChannel a -> (a -> m ()) -> m ()
 - foldWithM_ :: MonadIO m => ReadChannel a -> m () -> (a -> m ()) -> m ()
 - push :: MonadIO m => a -> WriteChannel a -> m ()
 - pull :: MonadIO m => ReadChannel a -> m (Maybe a)
 - finish :: MonadIO m => WriteChannel a -> m ()
 - unfoldM :: forall m a b. MonadIO m => m a -> (a -> b) -> m Bool -> WriteChannel b -> m ()
 - unfoldFile :: MonadIO m => FilePath -> WriteChannel b -> (ByteString -> b) -> m ()
 - unfoldT :: (MonadIO m, Foldable t) => t a -> WriteChannel b -> (a -> b) -> m ()
 
DP Flow Grammar
The following is the Context Free Grammar allowed to build a DPP Flow definition:
DP ->SourceCHANS:=>GeneratorCHANS:=>SinkDP ->SourceCHANS:=>GeneratorCHANS:=>FEEDBACK:=>SinkCHANS ->ChannelCH FEEDBACK ->FeedbackChannelCH CH ->Type:<+>CH |Eof
Example:
Source(Channel(Int:<+>Int)):=>Generator(Channel(Int:<+>Int)):=>Sink
Or with Feedback Channel to retrofit Streamming
Source(Channel(Int:<+>Int)):=>Generator(Channel(Int:<+>Int)):=>FeedbackChannel(String:<+>Eof):=>Sink
Building DynamicPipeline 
DynamicPipeline Data type is the point where all the information is contained in order the library can run our DP Algorithm.
This Data type contains three fundamental pieces: Source, Generator and Sink. But all these are dynamic based on the defined Flow. One of the fundamental feature of this Library is to provide several combinators that deduce from the Definition Flow, what are the Function Signatures the user must fulfill according to his definition.
All these combinators work in the same manner which based on the flow definition present to the user at compile time what is the function that must be provided. Lets see an example based on the Misc.RepeatedDP, which basically filter out repeated elements in a stream.
>>>import Relude>>>import DynamicPipeline>>>type DPEx = Source (Channel (Int :<+> Eof)) :=> Generator (Channel (Int :<+> Eof)) :=> Sink>>>:t withSource @DPExwithSource @DPEx :: forall k (st :: k). (WriteChannel Int -> DP st ()) -> Stage (WriteChannel Int -> DP st ())
In type DPEx = .. we are defining a Flow which contains a Source that is going to have an Int Channel that is going to feed the Generator. Therefore the Source should write on that channel and because of that we are being asked to provide a Function that WriteChannel Int -> DP st (). Remember that our Monadic context is always DP.
Having that we can provide that function and have all the pieces together for Source.
>>>let source' = withSource @DPEx $ \wc -> unfoldT ([1..10] <> [1..10] <> [1..10] <> [1..10]) wc identity>>>:t source'source' :: forall k (st :: k). Stage (WriteChannel Int -> DP st ())
So we are done. we provide that function. Now we can do the same for Sink which is the other opposite part of the Stream because Generator is a little different as we can see in the documentation.
>>>let sink' = withSink @DPEx $ \rc -> foldM rc $ putStr . show>>>:t sink'sink' :: forall k (st :: k). Stage (ReadChannel Int -> DP st ())
Done with Sink.
Generator and Filter
Now we reach to the last piece which needs more work to be done because it is the core of DPP which dynamically adds Parallel computations between the Generator Stage and previous Filters and Source.
Fortunately we have the same combinator withGenerator but it is not so straightforward what to put there. So, lets go step by step.
>>>:t withGenerator @DPExwithGenerator @DPEx :: forall k filter (st :: k). (filter -> ReadChannel Int -> WriteChannel Int -> DP st ()) -> Stage (filter -> ReadChannel Int -> WriteChannel Int -> DP st ())
At the first Glance it is asking for some similar function that is going to return our desired Stage but there is some type parameter which is not so obvious filter. Fortunately we have combinators for that as well that can save us a lot of time and effort.
Note: We could have done a Generator with an Empty Filter but we are not taking advantage of DPP in building a Pipeline Parallelization Computational Algorithm
In the case of Filter we have several combinators at our disposal.
- Use 
mkFilterif your DPP contains 1 actor per Filter - Use 
|>>and|>>>if your DPP contains more than 1 actor 
In our example we are going to use 1 actor only that is going to discard repeated elements >>> :t mkFilter @DPEx actor1 Variable not in scope: actor1 :: filterParam -> ReadChannel Int -> WriteChannel Int -> StateT filterState (DP st) ()
First lets fill in the gaps.
>>>let filter' = mkFilter @DPEx (\i rc wc -> foldM rc $ \e -> if e /= i then push e wc else pure ())>>>:t filter'filter' :: forall k filterState (st :: k). Filter DPEx filterState Int st
Basically we are checking if the element that we are reding from the Channel (Remember that we can have multiple Filter on front writing to us), is equal to the First Element that was read by the Generator and on which this Filter was instantiated with (a.k.a. filterParam). If the element is not equal we push it to the next Filter or Generator, otherwise we discarded.
>>>let gen' = mkGenerator (withGenerator @DPEx $ \f r w -> let unf = mkUnfoldFilterForAll' (`push` w) f Just r HNil in void $ unfoldF unf) filter'>>>:t gen'gen' :: forall k (st :: k). GeneratorStage DPEx (Maybe Int) Int st
Now we have everything in place we only need to call runDP and mkDP 
>>>runDP $ mkDP @DPEx source' gen' sink'12345678910
Types Flow definition
Eof is the End of Channel mark in the DP Definition Flow
Instances
data Generator (a :: Type) Source #
Generator contains the Generator Stage its Channels definitions in the DP definition Flow.
 a ~ ChannelInstances
data Source (a :: Type) Source #
Instances
data Channel (a :: Type) Source #
Channel is the Container Type of Open Union Type which is going to be defined with :<+>.
a ~ (Type:<+>Type:<+>...:<+>Eof)
Instances
data FeedbackChannel (a :: Type) Source #
FeedbackChannel is the Container Type of Open Union Type which is going to be defined with :<+> and indicates that this | Channel is for feedback to Source
a ~ (Type:<+>Type:<+>...:<+>Eof)
Instances
| MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # | |
Defined in DynamicPipeline.Flow  | |
| (MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # | |
| type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # | |
Defined in DynamicPipeline.Flow  | |
| type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # | |
data a :=> b infixr 5 Source #
This is the Type level function of the Open Union Type for Stages.
This should have the form:
Source(Channel..):=>Generator(Channel..):=>Sink
Constructors
| a :=> b infixr 5 | 
Instances
| Functor ((:=>) a) Source # | |
| Foldable ((:=>) a) Source # | |
Defined in DynamicPipeline.Flow Methods fold :: Monoid m => (a :=> m) -> m # foldMap :: Monoid m => (a0 -> m) -> (a :=> a0) -> m # foldMap' :: Monoid m => (a0 -> m) -> (a :=> a0) -> m # foldr :: (a0 -> b -> b) -> b -> (a :=> a0) -> b # foldr' :: (a0 -> b -> b) -> b -> (a :=> a0) -> b # foldl :: (b -> a0 -> b) -> b -> (a :=> a0) -> b # foldl' :: (b -> a0 -> b) -> b -> (a :=> a0) -> b # foldr1 :: (a0 -> a0 -> a0) -> (a :=> a0) -> a0 # foldl1 :: (a0 -> a0 -> a0) -> (a :=> a0) -> a0 # toList :: (a :=> a0) -> [a0] # elem :: Eq a0 => a0 -> (a :=> a0) -> Bool # maximum :: Ord a0 => (a :=> a0) -> a0 # minimum :: Ord a0 => (a :=> a0) -> a0 #  | |
| Traversable ((:=>) a) Source # | |
| MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # | |
Defined in DynamicPipeline.Flow  | |
| MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # | |
| (Bounded a, Bounded b) => Bounded (a :=> b) Source # | |
| (Eq a, Eq b) => Eq (a :=> b) Source # | |
| (Show a, Show b) => Show (a :=> b) Source # | |
| (MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # | |
| (MkCh inToGen, MkCh genToOut) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # | |
| type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # | |
Defined in DynamicPipeline.Flow  | |
| type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # | |
| type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # | |
| type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # | |
data chann1 :<+> chann2 infixr 5 Source #
This is the Type level function of the Open Union Type for Channels.
Channels forms an Open Union Type in each stage because according to DPP we can have multiple In and Out Channels in a Single Stage.
Eof should be the last Channel of the Open Union Type to indicate termination of the Grammar.
chann1 ~ Type
chann2 ~ Type
Constructors
| chann1 :<+> chann2 infixr 5 | 
Instances
| Functor ((:<+>) chann1) Source # | |
| Foldable ((:<+>) chann1) Source # | |
Defined in DynamicPipeline.Flow Methods fold :: Monoid m => (chann1 :<+> m) -> m # foldMap :: Monoid m => (a -> m) -> (chann1 :<+> a) -> m # foldMap' :: Monoid m => (a -> m) -> (chann1 :<+> a) -> m # foldr :: (a -> b -> b) -> b -> (chann1 :<+> a) -> b # foldr' :: (a -> b -> b) -> b -> (chann1 :<+> a) -> b # foldl :: (b -> a -> b) -> b -> (chann1 :<+> a) -> b # foldl' :: (b -> a -> b) -> b -> (chann1 :<+> a) -> b # foldr1 :: (a -> a -> a) -> (chann1 :<+> a) -> a # foldl1 :: (a -> a -> a) -> (chann1 :<+> a) -> a # toList :: (chann1 :<+> a) -> [a] # null :: (chann1 :<+> a) -> Bool # length :: (chann1 :<+> a) -> Int # elem :: Eq a => a -> (chann1 :<+> a) -> Bool # maximum :: Ord a => (chann1 :<+> a) -> a # minimum :: Ord a => (chann1 :<+> a) -> a #  | |
| Traversable ((:<+>) chann1) Source # | |
Defined in DynamicPipeline.Flow  | |
| (Bounded chann1, Bounded chann2) => Bounded (chann1 :<+> chann2) Source # | |
| (Eq chann1, Eq chann2) => Eq (chann1 :<+> chann2) Source # | |
| (Show chann1, Show chann2) => Show (chann1 :<+> chann2) Source # | |
| MkCh more => MkCh (a :<+> more) Source # | |
| type HChI (a :<+> more) Source # | |
Defined in DynamicPipeline.Flow  | |
| type HChO (a :<+> more) Source # | |
Defined in DynamicPipeline.Flow  | |
Smart Constructors
data DynamicPipeline dpDefinition filterState filterParam st Source #
DynamicPipeline data type which contains all the three Stages definitions that have been generated by other combinators like withSource, withGenerator and withSink.
dpDefinition ~Source(Channel..):=>Generator(Channel..):=>Sink- DP Type level Flow Definition
 filterState- State of the 
StateTMonadthat is the local State of the Filter execution filterParam- Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
 st- Existential Scope of 
DPMonad. 
data Filter dpDefinition filterState filterParam st Source #
Filter Is the template definition of the Filter that may be spawned when reading elements on the Stream.
Filteris aNonEmptyList ofActors.- Each 
Actoris executed sequentially on the that List when an Element arrive to thatFilterinstance. - All the 
Filterexecution (a.k.a.forM_ actors runStage) executes in aStateTMonadto share an internal state amongActors. 
dpDefinition ~Source(Channel..):=>Generator(Channel..):=>Sink- DP Type level Flow Definition
 filterState- State of the 
StateTMonadthat is the local State of the Filter execution filterParam- Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
 st- Existential Scope of 
DPMonad. 
Instances
| Generic (Filter dpDefinition filterState filterParam st) Source # | |
Defined in DynamicPipeline.Stage  | |
| Wrapped (Filter s' s a param) Source # | |
Defined in DynamicPipeline.Stage Associated Types type Unwrapped (Filter s' s a param)  | |
| type Rep (Filter dpDefinition filterState filterParam st) Source # | |
Defined in DynamicPipeline.Stage type Rep (Filter dpDefinition filterState filterParam st) = D1 ('MetaData "Filter" "DynamicPipeline.Stage" "dynamic-pipeline-0.3.1.2-inplace" 'True) (C1 ('MetaCons "Filter" 'PrefixI 'True) (S1 ('MetaSel ('Just "unFilter") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (NonEmpty (Actor dpDefinition filterState filterParam (StateT filterState (DP st)))))))  | |
| type Unwrapped (Filter s' s a param) Source # | |
Defined in DynamicPipeline.Stage  | |
data Actor dpDefinition filterState filterParam monadicAction Source #
Actor Is a particular Stage computation inside a Filter.
dpDefinition ~Source(Channel..):=>Generator(Channel..):=>Sink- DP Type level Flow Definition
 filterState- State of the 
StateTMonadthat is the local State of the Filter execution filterParam- Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
 monadicActionMonadWrapped inStateT.
data GeneratorStage dpDefinition filterState filterParam st Source #
GeneartorStage is a special Stage data type according to DPP Definition which contains a Filter template definition, in orther to know how to spawn a new Filter if it is needed, and the Stage of the Generator to allow the user to perform some computation in that case.
dpDefinition ~Source(Channel..):=>Generator(Channel..):=>Sink- DP Type level Flow Definition
 filterState- State of the 
StateTMonadthat is the local State of the Filter execution filterParam- Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
 st- Existential Scope of 
DPMonad. 
type family ValidDP (a :: Bool) :: Constraint where ... Source #
FCF - Type Level Defunctionalization ValidDP Check if IsDP is True
aIsDP dpDefinition ~ 'True
Throw a TypeError if Grammar is not correct
Equations
| ValidDP 'True = () | |
| ValidDP 'False = TypeError ((((((('Text "Invalid Semantic for Building DP Program" :$$: 'Text "Language Grammar:") :$$: 'Text "DP -> Source CHANS :=> Generator CHANS :=> Sink") :$$: 'Text "DP -> Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink") :$$: 'Text "CHANS -> Channel CH") :$$: 'Text "FEEDBACK -> FeedbackChannel CH") :$$: 'Text "CH -> Type :<+> CH | Eof") :$$: 'Text "Example: 'Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink'") | 
type family IsDP (dpDefinition :: k) :: Bool where ... Source #
FCF - Type Level Defunctionalization IsDP Validates if DP Flow Type Level Definition is Correct according to the Grammar
Equations
| IsDP (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) = And (IsDP (Source (Channel inToGen))) (IsDP (Generator (Channel genToOut))) | |
| IsDP (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = And (IsDP (Source (Channel inToGen))) (IsDP (Generator (Channel genToOut))) | |
| IsDP (Source (Channel (a :<+> more))) = IsDP (Source (Channel more)) | |
| IsDP (Source (Channel Eof)) = 'True | |
| IsDP (Generator (Channel (a :<+> more))) = IsDP (Generator (Channel more)) | |
| IsDP (Generator (Channel a)) = 'True | |
| IsDP x = 'False | 
DP is the only Monadic Action allowed to run a DP Defined Flow. It is restricted on Scope by its Existential Type st in order to not escape out from this Monadic Context.
data UnFoldFilter dpDefinition readElem st filterState filterParam l Source #
UnFoldFilter is a wrapper Data Type that contains all the information needed to spawn Filter instances according to DPP. The user will have the capability to select how those filters are going to be spawned, for example on each read element, how to setup initial states of StateT Monad on Actor computations in filters, among others.
dpDefinition ~Source(Channel..):=>Generator(Channel..):=>Sink- DP Type level Flow Definition
 readElem- Type of the element that is being read from the Selected Channel in the 
GeneratorStage st- Existential Scope of 
DPMonad. filterState- State of the 
StateTMonadthat is the local State of the Filter execution filterParam- Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
 
Arguments
| :: Stage (WithGenerator dpDefinition (Filter dpDefinition filterState filterParam st) (DP st)) | Generator   | 
| -> Filter dpDefinition filterState filterParam st | 
  | 
| -> GeneratorStage dpDefinition filterState filterParam st | 
Smart Constructor of GeneratorStage.
Arguments
| :: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st)) | Associated type family to Generate Function Signature  | 
| -> Filter dpDefinition filterState filterParam st | 
Smart Constructor of Filter.
Arguments
| :: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st)) | Associated type family to Generate Function Signature  | 
| -> Actor dpDefinition filterState filterParam (StateT filterState (DP st)) | 
Smart Constructor of Actor.
Arguments
| :: forall (dpDefinition :: Type) st. WithSource dpDefinition (DP st) | Associated type family to Generate Function Signature  | 
| -> Stage (WithSource dpDefinition (DP st)) | 
Arguments
| :: forall (dpDefinition :: Type) (filter :: Type) st. WithGenerator dpDefinition filter (DP st) | Associated type family to Generate Function Signature  | 
| -> Stage (WithGenerator dpDefinition filter (DP st)) | 
Arguments
| :: forall (dpDefinition :: Type) st. WithSink dpDefinition (DP st) | Associated type family to Generate Function Signature  | 
| -> Stage (WithSink dpDefinition (DP st)) | 
Arguments
| :: forall dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi. DPConstraint dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi | |
| => Stage (WithSource dpDefinition (DP st)) | 
  | 
| -> GeneratorStage dpDefinition filterState filterParam st | 
  | 
| -> Stage (WithSink dpDefinition (DP st)) | |
| -> DP st () | 
Smart constructor for DynamicPipeline Definition
Arguments
| :: forall dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4. SpawnFilterConstraint dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4 | |
| => UnFoldFilter dpDefinition readElem st filterState filterParam l | |
| -> DP st (HList l) | Return the list of   | 
Run UnFoldFilter
Arguments
| :: (readElem -> Bool) | Given a new Element determine if we need to interpose a new Filter or not  | 
| -> (readElem -> DP st ()) | For each element that the Filter is consuming allow to do something outside the filter with that element. For example trace or debug  | 
| -> Filter dpDefinition filterState filterParam st | 
  | 
| -> (readElem -> filterState) | Given the First element in this Filter Instance how to Initiate Internal   | 
| -> ReadChannel readElem | Main   | 
| -> HList l | 
  | 
| -> UnFoldFilter dpDefinition readElem st filterState filterParam l | 
Smart Constructor for UnFoldFilter
Arguments
| :: (readElem -> Bool) | |
| -> Filter dpDefinition filterState filterParam st | |
| -> (readElem -> filterState) | |
| -> ReadChannel readElem | |
| -> HList l | |
| -> UnFoldFilter dpDefinition readElem st filterState filterParam l | 
Smart Constructor for UnFoldFilter which bypass to do something externally on each read element
Arguments
| :: Filter dpDefinition filterState filterParam st | |
| -> (readElem -> filterState) | |
| -> ReadChannel readElem | |
| -> HList l | |
| -> UnFoldFilter dpDefinition readElem st filterState filterParam l | 
Smart Constructor for UnFoldFilter That creates a Filter for each element on the Read Channel and interpose on Front of Generator Stage and Last Filter
Source ---> Filter1 ---> Filter2 ... ---> FilterN ---> Generator ---> Sink
mkUnfoldFilterForAll' Source #
Arguments
| :: (readElem -> DP st ()) | |
| -> Filter dpDefinition filterState filterParam st | |
| -> (readElem -> filterState) | |
| -> ReadChannel readElem | |
| -> HList l | |
| -> UnFoldFilter dpDefinition readElem st filterState filterParam l | 
Idem for mkUnfoldFilterForAll but do something on each Element externally
data family HList (l :: [Type]) #
Instances
| (SameLengths '[x, y, xy], HZipList x y xy) => HUnzip HList x y xy | |
Defined in Data.HList.HList  | |
| (SameLengths '[x, y, xy], HZipList x y xy) => HZip HList x y xy | |
Defined in Data.HList.HList  | |
| HMapAux HList f ('[] :: [Type]) ('[] :: [Type]) | |
Defined in Data.HList.HList  | |
| (HSpanEqBy f a as fst snd, HGroupBy f snd gs) => HGroupBy (f :: t) (a ': as) (HList (a ': fst) ': gs) | |
| (ApplyAB f e e', HMapAux HList f l l', SameLength l l') => HMapAux HList f (e ': l) (e' ': l') | |
Defined in Data.HList.HList  | |
| HReverse l l' => HBuild' l (HList l') | |
Defined in Data.HList.HList  | |
| HExtend e (HList l) | |
| HInits1 a b => HInits a (HList ('[] :: [Type]) ': b) | |
Defined in Data.HList.HList  | |
| (Bounded x, Bounded (HList xs)) => Bounded (HList (x ': xs)) | |
| Bounded (HList ('[] :: [Type])) | |
| (Eq x, Eq (HList xs)) => Eq (HList (x ': xs)) | |
| Eq (HList ('[] :: [Type])) | |
| (Ord x, Ord (HList xs)) => Ord (HList (x ': xs)) | |
Defined in Data.HList.HList Methods compare :: HList (x ': xs) -> HList (x ': xs) -> Ordering # (<) :: HList (x ': xs) -> HList (x ': xs) -> Bool # (<=) :: HList (x ': xs) -> HList (x ': xs) -> Bool # (>) :: HList (x ': xs) -> HList (x ': xs) -> Bool # (>=) :: HList (x ': xs) -> HList (x ': xs) -> Bool # max :: HList (x ': xs) -> HList (x ': xs) -> HList (x ': xs) # min :: HList (x ': xs) -> HList (x ': xs) -> HList (x ': xs) #  | |
| Ord (HList ('[] :: [Type])) | |
| (HProxies l, Read e, HSequence ReadP (ReadP e ': readP_l) (e ': l), HMapCxt HList ReadElement (AddProxy l) readP_l) => Read (HList (e ': l)) | |
| Read (HList ('[] :: [Type])) | |
| (Show e, Show (HList l)) => Show (HList (e ': l)) | |
| Show (HList ('[] :: [Type])) | |
| (Ix x, Ix (HList xs)) => Ix (HList (x ': xs)) | |
Defined in Data.HList.HList Methods range :: (HList (x ': xs), HList (x ': xs)) -> [HList (x ': xs)] # index :: (HList (x ': xs), HList (x ': xs)) -> HList (x ': xs) -> Int # unsafeIndex :: (HList (x ': xs), HList (x ': xs)) -> HList (x ': xs) -> Int # inRange :: (HList (x ': xs), HList (x ': xs)) -> HList (x ': xs) -> Bool # rangeSize :: (HList (x ': xs), HList (x ': xs)) -> Int # unsafeRangeSize :: (HList (x ': xs), HList (x ': xs)) -> Int #  | |
| Ix (HList ('[] :: [Type])) | |
Defined in Data.HList.HList Methods range :: (HList '[], HList '[]) -> [HList '[]] # index :: (HList '[], HList '[]) -> HList '[] -> Int # unsafeIndex :: (HList '[], HList '[]) -> HList '[] -> Int # inRange :: (HList '[], HList '[]) -> HList '[] -> Bool # rangeSize :: (HList '[], HList '[]) -> Int # unsafeRangeSize :: (HList '[], HList '[]) -> Int #  | |
| (HZip HList a a aa, HMapCxt HList UncurryMappend aa a) => Semigroup (HList a) | |
| (HProxies a, HMapCxt HList ConstMempty (AddProxy a) a, HZip HList a a aa, HMapCxt HList UncurryMappend aa a) => Monoid (HList a) | |
| (TypeRepsList (HList xs), Typeable x) => TypeRepsList (HList (x ': xs)) | |
Defined in Data.HList.Data Methods typeRepsList :: HList (x ': xs) -> [TypeRep]  | |
| TypeRepsList (HList ('[] :: [Type])) | |
Defined in Data.HList.Data Methods typeRepsList :: HList '[] -> [TypeRep]  | |
| HAppendList l1 l2 => HAppend (HList l1) (HList l2) | |
| ApplyAB f e e' => ApplyAB (MapCar f) (e, HList l) (HList (e' ': l)) | |
Defined in Data.HList.HList  | |
| HInits1 ('[] :: [Type]) '[HList ('[] :: [Type])] | |
Defined in Data.HList.HList  | |
| HTails ('[] :: [Type]) '[HList ('[] :: [Type])] | |
Defined in Data.HList.HList  | |
| Apply (FHUProj sel ns) (HList l, Proxy ('HSucc n)) => Apply (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n) | |
| Apply (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n) | |
| (ch ~ Proxy (HBoolEQ sel (KMember n ns)), Apply (ch, FHUProj sel ns) (HList (e ': l), Proxy n)) => Apply (FHUProj sel ns) (HList (e ': l), Proxy n) | |
| Apply (FHUProj sel ns) (HList ('[] :: [Type]), n) | |
Defined in Data.HList.HArray Associated Types type ApplyR (FHUProj sel ns) (HList '[], n)  | |
| (HConcatFD as bs, HAppendFD a bs cs) => HConcatFD (HList a ': as) cs | |
Defined in Data.HList.HList  | |
| (HInits1 xs ys, HMapCxt HList (FHCons2 x) ys ys', HMapCons x ys ~ ys', HMapTail ys' ~ ys) => HInits1 (x ': xs) (HList '[x] ': ys') | |
Defined in Data.HList.HList  | |
| HTails xs ys => HTails (x ': xs) (HList (x ': xs) ': ys) | |
Defined in Data.HList.HList  | |
| HMapUnboxF xs us => HMapUnboxF (HList x ': xs) (RecordU x ': us) | |
Defined in Data.HList.RecordU  | |
| (HList (x ': y) ~ z, HZip3 xs ys zs) => HZip3 (x ': xs) (HList y ': ys) (z ': zs) | |
| type HExtendR e (HList l) | |
Defined in Data.HList.HList  | |
| type HMapCons x (HList a ': b) | |
Defined in Data.HList.HList  | |
| data HList ('[] :: [Type]) | |
Defined in Data.HList.HList  | |
| type UnHList (HList a) | |
Defined in Data.HList.HList type UnHList (HList a) = a  | |
| type HAppendR (HList l1 :: Type) (HList l2 :: Type) | |
| type ApplyR (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n) | |
| type ApplyR (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n) | |
| type ApplyR (FHUProj sel ns) (HList ('[] :: [Type]), n) | |
Defined in Data.HList.HArray  | |
| type ApplyR (FHUProj sel ns) (HList (e ': l), Proxy n) | |
| data HList (x ': xs) | |
Defined in Data.HList.HList  | |
| type HMapTail (HList (a ': as) ': bs) | |
Defined in Data.HList.HList  | |
Channels
data ReadChannel a Source #
ReadChannel can only read values of a previously written Channel. It is connected to a WriteChannel but hidden for the user 
a- Type that this Channel can read
 
data WriteChannel a Source #
WriteChannel can only write values into some Channel Queue
a- Type that this Channel can write
 
(|=>) :: MonadIO m => ReadChannel a -> WriteChannel a -> m () infixl 5 Source #
(|=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m () infixl 5 Source #
Alias mapF_
(|>=>) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () infixr 5 Source #
Alias mapM_
(|>=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () infixr 5 Source #
Alias mapMF_
Arguments
| :: MonadIO m | |
| => ReadChannel a | |
| -> WriteChannel b | |
| -> (a -> b) | Monadic Transformation to do with read element  | 
| -> m () | 
Same as map_ but mark Eof Channel after all processing
Arguments
| :: MonadIO m | |
| => ReadChannel a | |
| -> WriteChannel b | |
| -> (a -> b) | Monadic Transformation to do with read element  | 
| -> m () | 
map_ is a Natural Transformation from consumer ReadChannel to some producer WriteChannel applying a transformation with function f
Arguments
| :: MonadIO m | |
| => ReadChannel a | |
| -> WriteChannel b | |
| -> (a -> m (Maybe b)) | Monadic Transformation to do with read element  | 
| -> m () | 
Same as map_ But applying a Monadic mapping
Arguments
| :: MonadIO m | |
| => ReadChannel a | |
| -> WriteChannel b | |
| -> (a -> m (Maybe b)) | Monadic Transformation to do with read element  | 
| -> m () | 
Same as mapM_ but mark Eof Channel after all processing
Arguments
| :: MonadIO m | |
| => ReadChannel a | |
| -> (a -> m ()) | Computation to do with read element  | 
| -> m () | 
foldM_ is a Catamorphism for consuming a ReadChannel and do some Monadic m computation with each element
Arguments
| :: MonadIO m | |
| => ReadChannel a | |
| -> m () | Computation to do at the end of the channel  | 
| -> (a -> m ()) | Computation to do with read element  | 
| -> m () | 
Idem foldM_ but allows pass a monadic computation to perform at the end of the Channel
push :: MonadIO m => a -> WriteChannel a -> m () Source #
Push element a into WriteChannel
pull :: MonadIO m => ReadChannel a -> m (Maybe a) Source #
Pull element Maybe a from ReadChannel
finish :: MonadIO m => WriteChannel a -> m () Source #
Finalize Channel to indicate EOF mark and allow progress on following consumers
Arguments
| :: forall m a b. MonadIO m | |
| => m a | Monadic Seed  | 
| -> (a -> b) | Map input from seed to something to be written in Channel  | 
| -> m Bool | When stop unfolding  | 
| -> WriteChannel b | 
  | 
| -> m () | 
Coalgebra with Monadic computation to Feed some WriteChannel
m- Monadic computation wrapping Coalgebra
 a- Element get from some Source and to be write in some Channel
 
| unfold from a Monadic seed m a to a WriteChannel
Arguments
| :: MonadIO m | |
| => FilePath | Seed   | 
| -> WriteChannel b | 
  | 
| -> (ByteString -> b) | Transform   | 
| -> m () | 
Using unfoldM, unfold from file