Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions doc/design/csp.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ ch1 := make(chan int, 100) // a channel that can buffer 100 ints.
In Fluid, we should be able to do the same:

```python
ch = fluid.make_chan(dtype=INT)
ch1 = fluid.make_chan(dtype=INT, 100)
ch = fluid.make_channel(dtype=INT)
ch1 = fluid.make_channel(dtype=INT, 100)
```

In addition to that, we want channels that can hold more complex element types, e.g., Tensors of float16:

```python
ch = fluid.make_chan(dtype=Tensor, etype=float16)
ch = fluid.make_channel(dtype=Tensor, etype=float16)
```

or Tensors of Tensors of float16 etc.
Expand All @@ -87,6 +87,76 @@ The point here is that we need a consistent way to compose types, like in C++ we

### Send and Recv

In Go, we first create a channel as explained in the section above and then perform read and write operations on top of the channels.

```go
ch1 := make(chan int)
ch2 := make(chan int, 100)
```

To write (or perform a `Send` operation) the value of a variable `x`, to channel `ch1` above, we perform the following:

```go
ch1 <- x
fmt.Println("Written to the channel")
```
Now to read (or perform a `Recv` operation) the value stored in `ch2` into a variable `y`, we perform the following:

```go
y <- ch2
fmt.Println("Received from channel")
```

In Fluid, we should be able to perform the above operations on the channel objects as well. As of now, we support two different kinds of channels : [Buffered Channel](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/details/buffered_channel.h) and [UnBuffered Channel](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/details/unbuffered_channel.h)

Send and Receive can be performed as following on a buffered channel:

```python
import threading

def send_to_channel(channel, num_time=1):
for i in xrange(num_time):
channel.send(i)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The send operation must be a global function, but not the method of buffer, because it must be able to handle the case that the buffer is nil.
  2. The send operation has to be a fluid operator, not a Python function or method.

# Create a buffered channel of capacity 10
buffer_size = 10;
ch = fluid.make_channel(dtype=INT, buffer_size)

# Now write three elements to the channel
thread = threading.Thread(target=send_to_channel, args=(ch, 3, ))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check it works, we need fluid.go, instead of threading.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To illustrate how to use buffered channels, we don't need the goroutine, because the purpose is to show that the single thread example wouldn't block as long as the buffer size is big enough.

thread.daemon = True
thread.start()

# Read all the data from the channel
for i in xrange(3):
y = ch.recv()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

y = fluid.recv(ch)

It doesn't help at all to expose the C++ method Channel::Recv to Python as Channel::recv.


# Done receiving , now close the channel
ch.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fluid.close(ch)

```

The send and receive operations will be similar for unbuffered channel as well, except for the fact that there is no buffer in an unbuffered channel, so the operations are completely synchronized. For example:

```python
import threading

def send_to_channel(channel, data):
channel.send(data)

# Create an unbuffered channel
ch = fluid.make_channel(dtype=INT)

# Writes and Reads are synchronous otherwise the calls will block.
thread = threading.Thread(target=send_to_channel, args=(ch, 10, ))
thread.daemon = True
thread.start()

y = ch.recv()

# Done receiving , now close the channel
ch.close()
```

### Select

## Example Programs
Expand Down