상세 컨텐츠

본문 제목

Section 5.4.26 Pipelines –Build the Hydra Chat System

Go/Mastering Go

by Gopythor 2022. 3. 11. 00:50

본문

728x90
반응형

The Pipeline Pattern

https://blog.golang.org/pipelines
A concurrent pattern
A series of 'stages' connected by channels
Each stage is a group of goroutines running the same function

  • So what is the pipeline design pattern and detailed article covering the pattern can be found in the link shown here.
  • Pipeline is a design pattern that makes have used of go's concurrency.
  • And devides code into series of stages that can communicate using go channel's.
  • Each stage is a group of go routines performing the same function.
  • Each stage receives its input values via channels.
  • Then it performs some work on those values then send those values down using channels to the next stage.
  • First stage can be called the producer whereas the last stage can be called the consumer.
  • pipelines exist in many other languages besides go.
  • And are popular for efficient software design.

Hydra Chat System Pipeline

One cline sends a message to the chat server for a specific chat room
The chat server sends the same message to all clients in the same chat room

  • We will design our Hydra Chat system using a data pipeline.
  • There will be a chat server and multiple chat clients.
  • The data flow will start from one of the clients to the server and then back.
  • After a bunch of clients connect to the chat server, each client will get into a chat room.
  • And they will all share the same chat room.
  • That chat room will be stored in the chat server.
  • And then after that the server will broadcast the messages from any of other clients to other clients connected to the chat server on the same chat room.
  • So let's say this client sends a message to the chat server on a chat room.
  • Let's say that chat room includes all the other clients.
  • Then the message will be broadcasted to all the other clients.

Pipelines in the Chat Application

Stage 1 (producer):
StartClient() will send data to the msg ch
clients.go
↓msgCH channel
Stage 2:
broadcastMsg() will fan-out the message to multiple client channels
rooms.go
↓↓↓wc channel
Stage 3:
Use writemonitor to send the message to clients
clients.go

  • Let's see how we can use a pipeline pattern to design our chat application.
  • Let's assume the pipeline pattern starts after some clients are connected to the chat server.
  • The first stage in here is when we get a message from a client.
  • This message will go to a shared Channel called msgCH which says for message a channel.
  • The second stage, data on the message channel will then get copied to multiple channels.
  • Each of those channels will lead to another chat clients connected to our chat server.
  • The operation of taking value from a single channel and then copying it to multiple channels is called fan-out.
  • On the third stage, each one of those channels will send data to the chat client that presents.
  • We will see in our code how that is done.

hydrachat

  • We'll Create a project folder called hydrachat which will leave underneath the hydra folder.
  • That leaves inside the Go workspace source folder.
  • There are three main files that we care about under the hydrachat folder.
  • Those files are rooms.go, hydrachat.go, and clients.go

rooms.go

  • Let's start by looking at the code at rooms.go which will host that chat room code for our chat server.
  • The package name will be called hydrachat.
import ( 
        "fmt" 
        "io" 
        "net" 
        "sync" 
)
  • These will be the packages that we'll be using in our code.
  • We will see how we use each one as we go.
type room struct {  
    name string  
    Msgch chan string  
    clients map[chan<- string]struct{}  
    Quit chan struct{}  
    *sync.RWMutex  
}
  • Now we create a struct to present the chat room.
  • The struct will be called room.
  • And the fields inside will be the name of the chat room.
  • The message channel(Msgch) which will host messages passing through the chat room.
  • A clients map which will represent the people currently talking on the chat room.
  • So the map, keys will be channels of the type string.
  • When we use an arrow after the word chan, we indicate that this channel is a send only channel.
  • So we can only send strings to this channel but we cannot receive on this channel.
  • The reason why we use a map with an empty struct is to present a send data structure.
  • When we create a map with keys but at the same time with empty values which in go would be the equivalent of an empty struct.
  • Than that we will present a set data type.
  • So this map will represent a set of unique chat clients that will be connected to our chat server.
  • So whenever we send a message here, this message will be sent to the chat client connected to our chat server.
  • Or actually to be more specific, There'll be connected to the chat room inside our chat server.
  • And that chat room is on a presented by this room struct type.
  • We will then use a quit channel which will signal that the room is qutting.
  • And that all clients need to be connected.
  • So when a signal is sent over here or this channel is closed, then we know our chat room is closing.
  • we'll also embed in our room struct type.
  • A sync.RWMutex pointer type for used to protecting our client maps from concurrent access.
  • We'll see how to use it shortly.

CreateRoom


func CreateRoom(name string) *room {  
    r := &room{  
        name: name,  
        Msgch: make(chan string),  
        RWMutex: new(sync.RWMutex),  
        clients: make(map[chan<- string]struct{}),  
        Quit: make(chan struct{}),  
    }  
    r.Run()  
    return r  
}
  • Now let's write the code to create the room.
  • So use a function called CreateRoom which will take a string as an argument.
  • For presenting the room name, then we will return a pointer to a room object.
  • We will use a struct literal where will initialize all the fields in order to be able to use them the right way.
  • So initialize a message channel, initialize a clients map or set, initialize a quit channel.
  • And of course name is assigned the argument that gets passed to the constructor.
  • Before we return the room pointer type to the caller, We will call a method called run which belongs to the room type.

Run method


func (r *room) Run() {  
    logger.Println("Starting chat room", r.name)  
    go func() {  
    for msg := range r.Msgch {  
        r.broadcastMsg(msg)  
        }  
    }()  
}
  • This method will cause a room to start waiting for people or clients to connect to the chat room.
  • We will be implementing run shortly.
  • We'll see how we write it.
  • The run method will first output a log line indicating that we are starting the chat room.
  • And we'll print the chat room name.
  • Then we will create a Go routine which will host a for range statement listening of the chat room a message channel in order to capture any messages that get passed as a channel.
  • Whenever a chat client sends a message to our server chat room, The message will go to the message channel.
  • And then when we receive the message we'll broadcast this message to all the clients connected to our chat room.
  • We will implement the broadcast message very shortly to see how to do that.
  • The reason why we place this logic on a go routine is so that we do not block the run method.
  • So this will run as long as a message channel is alive.

broadcastMSG method


func (r *room) broadcastMsg(msg string) {  
    r.RLock()  
    defer r.RUnlock()  
    fmt.Println("Received message:", msg)  
    for wc, _ := range r.clients {  
        go func(wc chan<- string) {  
        wc <- msg  
        }(wc)  
    }  
}
  • Now let's explore the broadcast message method.
  • This method will take a message string that we obtain from the message channel.
  • And then it will send it to all the clients connected to our chat room.
  • Now we will use Read Locks.
  • Use how I use the defer here with RUnlock.
  • That is to guarantee that RUnlock will happen at the end of our function.
  • The reason why we use Read Locks instead of plain locks is because we are not adding or removing items to the client's map which is what we are protecting here.
  • What we are doing is that we are reading from our client's map which will provide us for each for loop.
  • The string channel for which the chat client is listening.
  • And then inside the Go routine, we will pass a string message that we are broadcasting to the channel of this specific client that we are currently on.
  • The reason why we call the client channels wc is because it presents write channels or send only channels.
  • We use anonymous function here is a Go routine for efficient coding.
  • So that each client will be handled on a different go routine concurrently.
  • The function will take an argument as shown
  • The arguments will be our client channel.
  • The reason why we send an argument to the Go routine function is to ensure this value is copied before we excute logic on it inside our Go routine.
  • If we don't do that then that's value might change between while loop and another.
  • So we have to pass it as an argument in order to copy it and presisted???? inside our function code.
  • Sending the messages on multiple client channels on different Go routines ensures that if there is a problem which one of the clients receiving the message then that it would not affect the other clients.
  • Because let's say for example if our chat client code is not ready to read from this channel, then if we dont have this code running in go routine, this method will block till the channel is read
  • And that is why we need to run it on a separate go routine.

Addclient method


func (r *room) AddClient(c io.ReadWriteCloser) {  
    r.Lock()  
    wc, done := StartClient(r.Msgch, c, r.Quit)  
    r.clients[wc] = struct{}{}  
    r.Unlock()


    //remove client when done is signalled  
    go func() {  
    <-done  
    r.RemoveClient(wc)  
    }()
}
  • Now let's look at the code for adding a client to our chat room for the purpose of this exercise.
  • Let's ?????? clients connects via TCP to the chat server.
  • This will make the chat server at TCP server while the chat clients TCP clients.
  • In here We will use a Write Lock as shown to protect our client map.
  • Well we start a new client and add a client to the client map.
  • The argument to a AddClient c, we represent a client connection.
  • We will see how to use it later.
  • StartClient is actually a channel generator function.
  • It will take the room object message channel as well as our client connection which is an argument passed here as client, as well as our chat room Quit channel and it will return a channel which represents the write channel that this particular client will be listening on and a done signal.
  • A done signal indicates of the client is being closed.
  • We will see how StartClient is implemented a little later.
  • Adding a new client to the client map is the same as adding a key to the client's map.
  • So the value is empty because we only care about the key.
  • Because clients have is practically a set.
  • The second half of the AddClient method will start a new go routine which will block by listening on the done channel.
  • So as I mentioned earlier that done channel is basically a signal which will be only received when the client is no longer part of our chat room.
  • Then whenever we will get a signal on the done channel, we will remove this client from the chat room as it's no longer relevant.
  • This is a nice design Approaching in go which allows us to clean up after the client is done without having to write complex ogic elsewhere to do that.

Remove client method


func (r *room) RemoveClient(wc chan<- string) {  
    logger.Println("Removing client")  
    r.Lock()  
    close(wc)  
    delete(r.clients, wc)  
    r.Unlock()  
    select {  
    case <-r.Quit:  
    if len(r.clients) == 0 {  
        close(r.Msgch)  
        }  
    default:  
    }  
}
  • Now let's discuss our RemoveClient method.
  • We will use this method to remove a client which is represented by the clients write channel from our client maps or set
  • We will use our write locks here, because we're actually moving items from the client's map.
  • So inside the lock will start by closing the channel once and for all using the close keyword.
  • And then we'll use the delete key word to delete the client or our write channel representing the client from the client's map.
  • After we're done with the locking, we will use a select statement to check whether our room quit channel had been signaled or not.
  • If so and there are no clients left, we will close the main message channel of our chat room.
  • We do this since this will be the last client for removing from the chat room.
  • So we're cleaning our chat room.
  • The default key here is empty.
  • The reason why we use the default keys here inside our select statement is to prevent the select statement from blocking our code is that Quit channel is not ready yet.
  • So this code will not block.
  • It will execute normally and the most client will return to the caller.
  • One important remark is that when we close the main message channel, then the for range loop that used to listen to the message channel which was triggered by our room.Run method will exit.
  • So this go routine will return when the channel closes.

clients.go

  • Now lets look at the clients to go file.
  • This is where we will write that behavior of the chat clients.
  • So start by importing bufio and io packages.
  • We will see how we use them as we go.
  • type client struct { *bufio.Reader *bufio.Writer wc chan string }
  • And that will create our client struct which we'll present our chat client object.
  • It will embed the bufio.Reader type pointer and bufio.Writer pointer type.
  • And it will also include channel of type string which will be the write channel which this client will listen to.

StartClient


func StartClient(msgCH chan<- string, cn io.ReadWriteCloser, quit chan struct{}) chan<- string {  
    c := new(client)  
    c.Reader = bufio.NewReader(cn)  
    c.Writer = bufio.NewWriter(cn)  
    c.wc = make(chan string)  
    done := make(chan struct{})

    //setup the reader  
    go func() {  
    scanner := bufio.NewScanner(c.Reader)  
    for scanner.Scan() {  
        logger.Println(scanner.Text())  
        msgCH <- scanner.Text()  
        }  
    done <- struct{}{}  
}()

// setup the writer  
c.writerMonitor()

go func() {  
    select {  
        case <-quit:  
            cn.Close()  
        case <-done:  
        }  
    }()  
return c.wc, done
}
  • Now its time to explore the Start client code.
  • So the start client code is what we called when we added a new client here to our chat room.
  • StartClient will take three arguments the first argument will be the message channel of the chat room that the client is supposed to connect to.
  • The second argument is supposed to be the actual client connection between the chat client and that chat server.
  • And the third argument will be the quit channels will presenting the close of the chat room.
  • First thing we do is to create a new client object.
  • We use the keyword new because we will get a pointer to it or a reference to it.
  • Then we will set the reader and the writers for this client.
  • So the reader here is embedded reader type bufio.Reader type.
  • The writer here is embedded buffer.Writer type.
  • As see we initialize them using the client connection that was passed as an argument of StartClient.
  • The client connection with the argument that we originally received from the AddClient.
  • We used bufio here so that we can obtain buffered readers and buffered writers for our messages on this connection channel.
  • As I've mentioned before.
  • Let's assume for the exercise that this is a TCP connection to be of type ReadWriteCloser interface.
  • Those readers and writers will allow us to efficiently send than read messages on the network connection.
  • We will then create our client write channel as shown.
  • So we will use the make keyword and we're creating a channel type string.
  • This is where the client messages will leave.
  • And we'll also create the done channel which if you remember earlier we'll represent the signal that will get sent when the client stops.
  • The done channel will be used to inform the room object.
  • The chat room object that it needs to clean up this client.
  • Next we will evoke a new go routine where we will set up a scanner of the network connection buffer.
  • Scanner is a type in the bufio package which allows us to efficiently read buffer data.
  • We can use scanner.scan to read data as it comes.
  • Scanner.Text is a way to convert the receive data to a string.
  • We will initialize our scanner on the reader buffer of our client the object.
  • The reader buffer were presented TCP channel that our client is sending the data on to our chat server.
  • We will then use a for loop with a method called scanner.scan which will listen to data coming through the TCP channel encapsuletd in our scanner.
  • Whenever a data is received, we'll get inside our for loop.
  • So this is the part where our client sends the message that it receives via TCP or like our underlying connection between the client.server to the chat room message channel.
  • Now as long as a connection buffer is healthy this for loop will stay alive.
  • Then once the connection goes down for any reason, this for loop will exit.
  • When the for loop exits, We'll get this piece of code and this is when we send the signal, In this case we're just sending an empty value because we don't need to send the specific values.
  • Just need to send anything on the done channel.
  • So that the chat room object listening to the done channel can realize that the client is closing down that the underlying connection of this client is no longer valid.
  • The other statement we hadn't said our for loop was just the logging statement.
  • We can use it for troubleshooting.
  • So when we receive a message on the client connection or the connection opened to a chat client, we should receive some text here some strings here which is what we can represent by scanne.Text.
  • The scanner object is an important part of the bufio package.
  • Next we will call a method called writeMonitor which will take care of writing to the underlying network buffer.
  • Whenever a message needs to be consumed by this client object.
  • We'll discuss this method very shortly.
  • We'll then create another go routine to watch for the room chat through quit channel.
  • Whenever the quit channel gets signaled, We will close the underlying connection of our client to clean up resources.
  • If done happens before the quit channel happens.
  • Then we will exit the select which is why we will leave this done blank the done key is blank.
  • Effectively exiting the go routine.
  • Now at the end of our main function we will return our client write channel which we created here as well as the done channel.
  • And those were the two values that we used in our chat room code

writeMonitor


func (c \*client) writerMonitor() {  
    go func(){  
        for s:= range c.wc {  
            //logger.Println("Sending", s)  
            c.WriteString(s + "\\n")  
            c.Flush  
            }  
        }()  
    }
  • In the write monitor which if you remember we set here in our StartClient code.
  • And you go routine will be created which will listen to the client write channel using the for range statement.
  • Once a message is ready in the client write channel, we'll write to the underlying client network buffer.
  • So WriteString and Flush are part of the bufio Writer.
  • The reason why we can call them directly from the client object is because we embed this type as seen here in the client struct type.

Summary

  • So let's explore the pieces of our code that are present that the pipeline pattern.
  • The first stage of our pipeline pattern was when we started a new client, and then whenever we send a message received by the client to the a chat room message channel which was passed as an argument.
  • The second stage of our pipeline was when we receive a message on the message channel which belongs to our chatroom object.
  • We then broadcast it as shown here to all other clients connected to our chat room.
  • The last stage of our pipeline was in the writeMonitor where whenever we receive a message on the client write channel.
  • We would then write it back to the client.
  • So for example if we have say five clients connected to our chat room we will send the message received to our chat room to the write channel of each one of those clients.
  • And then on each client there is a writerMonitor go routine that is running.
  • Listening to the clients write channel.
  • So whenever a value is received on each client's write channel, the message will be written to the underlying network buffer of the client object using the write string method.
  • Flush is used to flush the data.
  • We will add a Slash end to our message to indicate an end of line for our message.
  • We covered pipelines and how it fits in software design.
728x90
반응형

관련글 더보기

댓글 영역