상세 컨텐츠

본문 제목

Section 5.5.27 Pipelines – Build the Hydra Chat System (Continued)

Go/Mastering Go

by Gopythor 2022. 3. 20. 05:21

본문

728x90
반응형

Hydrachat.go

package hydrachat

import (
    "fmt"
    "net"
    "os"
    "os/signal"
    "syscall"

    "github.com/gopythor/udemy/Hydra/hlogger"
)

var logger = hlogger.GetInstance()

//Start hydra chat
func Run(conneciton string) error {
    l, err := net.Listen("tcp", connection)
    r := CreateRoom("HydraChat")
    if err != nil {
        logger.Println("Error connection to chat client", err)
        return err
    }

    go func() {
        //Handle SIGINT and SIGTERM.
        ch := make(chan os.Signal)
        signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
        <- ch

        l.close()
        fmt.Println("Closing tcp connection")
        close(r.Quit)
        if r.ClCount() > 0 {
            <- r.Msgch
        }
        os.Exit(0)
    }()

    for {
        conn, err := l.Accept()
        if err != nil {
            logger.Println("Error accepting connection from chat client", err)
            break
        }
        go handleConnection(r,conn)
    }

    return err
}

func handleConnection(r *room, c net.Conn) {
    logger.Println("Received request from client", c.RemoteAddr())
    r.AddClient(c)
}
  • Now let's explore the Hydrachat.go file which is underneath our Hydrachat folder.
  • This is where the chat TCP server will leave.
  • So we import several packages as shown.
  • We will use these packages one by one as we go.
  • And then after importing their packages we will initiate a new instance of the Hydra logger(hlogger) which we wrote in Section 4.
  • So we will start by writing a run function which will take us connection string as an argument.
  • And we return an error object.
  • The run function will implement the TCP chat server for the hydra's software system.
  • We use net.Listen in order to create a listener on the connection address string provided.
func CreateRoom(name string) *room {
    r := &room{
        name:    name,
        Msgch:   make(chan string),
        RWMutex: new(sync.RWMutex),
        clients: make(map[chan<- string]struct{}), 
        Quit:    make(chan struct{}),
    }
    r.Run()
    return r
}
  • Then we call create room which we wrote to create a chat room.
  • So CreateRoom was covered previously and then there we create a room object.
  • Then we run this room object.
  • Then we do our error checks by checking if there were any errors from the net.Listen if there are none then we log and return log the error.

Before

func Run(conneciton string) error {
    l, err := net.Listen("tcp", connection)
    r := CreateRoom("HydraChat")
    if err != nil {
        logger.Println("Error connection to chat client", err)
        return err
    }

}

After

func Run(conneciton string) error {
    l, err := net.Listen("tcp", connection)
    if err != nil {
        logger.Println("Error connection to chat client", err)
        return err
    }
    r := CreateRoom("HydraChat")
}
  • That is well actually a better way to do this would be to create the room after we ensure that no errors occurred like that.
  • That is because when we get to this point of the code then we know that the listener has been successfully created.
  • And then we can continue with the rest for our logic which starts by creating a chat room called HydraChat.
go func() {
    //Handle SIGNT and SIGTERM.
    ch := make(chan os.Signal)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
    <- ch

    l.close()
    fmt.Println("Closing tcp connection")
    close(r.Quit)
    if r.ClCount() > 0 {
        <- r.Msgch
    }
    os.Exit(0)
}()
  • The next piece of code here is a go routine which we will use to clean up our resources and signal the exiting of that chat server.
  • The reason why go routine is because we don't want this piece of code here to block the execution of our run function code.
  • So in this go routine what we need to do is to monitor if our program is in the process of exiting.
  • Meaning that if we are running a micro service where this code is getting executed.
  • And then the micro service is getting an exit signal or like the service is shutting down or the program is closing.
  • Then we need to monitor this fact so that when we detect that program or the micro services closing, we started cleaning up our resources.
  • Now to detect if a program is in the process of exiting, We need to monitor a channel called os.Signal.
  • So the way how this works is first by creating a channel of type os.Signal.
  • And then we call function called signal.Notify.
  • And signal is here If you can look here. The import related to os/signal.
  • It's a package underneath the OS package folder.
  • So it belongs to go's standard packages.
  • And then Notify is a way to listen to operating system signals which are passed as arguments here.
  • And then trigger at channel with the OS signal observe.
  • When the OS throws the signal.
  • So if our OS throws an interrupt signal or a terminate signal which are both operating system signals which indicate the exiting of our program.
  • Signal.notify will pass the signal to the channel we created here.
  • And we can keep listening to the channel by just using the syntax.
  • So the syntax will block the code execution and this go routine till this channel gets a signal
  • And since this channel will only get a signal when one of those operating system signals get thrown.
  • Then we know that that code will never get to the next line of execution until our program starts to exit.
  • So when our program starts to exit first thing we do is close our listener by calling l.close.
close(r.Quit) 
if r.ClCount() > 0 {
    <- r.Msgch 
    }
  • Then we close our chat room and we do that by closing the quit channel in our chat room object which will cause a broadcasting signal to be sent to all the Go routines listening on the r.Quit channel.

clients.go

    go func() {
        select {
        case <-quit:
            cn.Close()
        case <-done:
        }
    }()
  • So as a quick refresher for go to Clients.go
  • We will see here that whenever we start a new client.
  • There was a go routine here.
  • That we'll be waiting for either the quit channel or the done channel.
close(r.Quit) 
if r.ClCount() > 0 {
    <- r.Msgch 
    }
  • So in our hydrachat.go code here.
  • When we close the quit channel and say done haven't been signaled, then that means that quit will get signaled and the channel will get closed.
  • Channel here is basically the TCP channel that will present our client connection to the chat server.
  • So when this gets closed, A sequence of events will start which will ultimately close our client

clients.go

//setup the reader
go func() { 
    scanner := bufio.NewScanner(c.Reader) 
    for scanner.Scan() {
    logger.Println(scanner.Text())
    msgCH <- scanner.Text()
    }
    done <- struct{}{}
}()
  • So if we're to follow it really quick, we will see here that first thing would happen would be the our scanner which is listening on the channel reader would return false.
  • Because it will stop scanning as it will detect that the reader is no longer valid because the channel had been closed.
  • And when it returns false then this for loop exits.
  • And when this for loop exits, the done signal'll get a value an empty struct here.
  • Because we don't need an actual value we just need to trigger a signal on the done Channel.

rooms.go

func (r *room) AddClient(c io.ReadWriteCloser) {
    r.Lock()
    wc, done := StartClient(r.Msgch, c, r.Quit)
    r.clients[wc] = struct{}{}
    r.Unlock()

    //remove client when done is signalled
    go func() {
        <-done
        r.RemoveClient(wc)
    }()
}

func (r *room) RemoveClient(wc chan<- string) {
    logger.Println("Removing client")
    r.Lock()
    close(wc)
    delete(r.clients, wc)
    r.Unlock()
    select {
    case <-r.Quit:
        if len(r.clients) == 0 {
            close(r.Msgch)
        }
    default:
    }
}
  • Now when the done channel gets a signal here then this code will get executed.
  • And this was a go routine which we had started when the AddClient method was called on the chat room.
  • And what Remove client would do when to do the necessary work to ensure the resources are clean and the client had been removed.
  • And then it will look into the Quit signal again.
  • And if the quit signal is valid, and at the same time our clients map have been emptied then we will close the chat room main message channel.
  • This was a channel used by the Chatroom object in order to share messages between clients.
  • This shows you the power of the go routines and the channel design patterns.
  • You can control a sequence of events just by triggering a channel closed.
  • Ultimately r.clients will become empty because every single go routine listening to r.Quit with call the RemoveClient method which would cause our r.clients map to get emptied one item at the time.
  • We do that so that we ensure we never exit our program which is done by os.Exit.
  • Before everything is cleaned up, os.Exit is a function we have from the OS package which then exit our current program or our micro service this case.
  • It takes a status ????? code as an argument and code zero indicates success, non-zero indicates an error.
  • Now that we're done with a clean up go routine logic.

hydrachat.go

//Start hydra chat
func Run(conneciton string) error {
    l, err := net.Listen("tcp", connection)
    if err != nil {
        logger.Println("Error connection to chat client", err)
        return err
    }
    r := CreateRoom("HydraChat")
    go func() {
        //Handel SIGNT and SIGTERM.
        ch := make(chan os.Signal)
        signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
        <- ch

        l.close()
        fmt.Println("Closing tcp connection")
        close(r.Quit)
        if r.CLCount() > 0 {
            <- r.Msgch
        }
        os.Exit(0)
    }()
        for {
        conn, err := l.Accept()
        if err != nil {
            logger.Println("Error accepting connection from chat client", err)
            break
        }
        go handelConnection(r,conn)
    }

    retrun err
    }
}
  • Let's move on to see what we do when we receive client connections to our TCP chat server.
  • So we will put our code here inside a for loop that exists on the same go routine as a run function.
  • And by doing this we make the run function a blocking function meaning that it would not return for a while till the for loop breaks.
  • Writing TCP listeners is pretty straightforward in go.
  • We covered that TCP listeners more details in section 3.
  • When we went through the net package.
  • Let's see that in action.
  • So first thing we do Accept an incoming connection is to call the Accept method which belongs to the listener object.
  • L here is a listener object which we get from net.Listen.
  • So this(l.Accept()) is a blocking call that will return on either an error or whenever we accept an new tcp connection meaning client trying to connect to our chat server.
  • So if there are any errors we log the error and we break effectively breaking this for loop.
  • Other ways we handle the incoming client connection.
  • We do that by calling a function called handleConnection which will take a chat room object as an argument as well as TCP connection object as the second argument.

handleConnection

func handleConnection(r *room, c net.Conn) {
    logger.Println("Received request from client", c.RemoteAddr())
    r.AddClient(c)
}
  • The handleConnection gets called on a separate go routine so that it does not disturb this for loop.
  • Let's explore handle connection.
  • handleConnection is a very simple function.
  • First thing we do is log the fact that we received a request from clients.
  • And then we will print out the address of the remote client.
  • So that we know which IP address is trying to connect our chat server.

AddClient

func (r *room) AddClient(c io.ReadWriteCloser) {
    r.Lock()
    wc, done := StartClient(r.Msgch, c, r.Quit)
    r.clients[wc] = struct{}{}
    r.Unlock()

    //remove client when done is signalled
    go func() {
        <-done
        r.RemoveClient(wc)
    }()
}
  • And then the next thing we do is effectively add the client to our chat room.
  • And this is done by the Addclient method.

main.go

package main

import (
    "bufio"
    "fmt"
    "log"
    "math/rand"
    "net"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    name := fmt.Sprintf("Anonymous%d", rand.Intn(400))
    fmt.Println("Starting hydraChatClient....")
    fmt.Println("What's your name?")
    fmt.Scanln(&name)



    scanner := bufio.NewScanner(os.Stdin)
    for scanner.Scan() && err == nil {
        msg := scanner.Text()
        _, err = fmt.Fprint(conn, name+msg+"\n")
    }
}
  • It's now time to write a chat TCP client to test our code.
  • So I added a folder here called hchatclient which stands for hydrachat underneath the hydrachat folder.
  • There is a main.go file which is where our chat client code will live.
  • So this will be simple executable that will act as a chat client for the Hydra software system.
  • Since it's an executable the package will be made and will import a bunch of packages here for our usage.
  • We'll start by creating a name string with the word anonymous followed by a random integer that is less than 400.
  • The client will then simply ask the user for the name and then we will use fmt.Scanln to retrieve that name.
  • If for some reason we weren't able to scan the name, we will use the Anonymous name that we assigned earlier.
  • This name string will be attached to any message sent to the chat server.
  • This is so that the person doing that chatting will be known.
    fmt.Printf("Hello %s, connecting to the hydra chat system.... \n", name)
    conn, err := net.Dial("tcp", "127.0.0.1:2300")
    if err != nil {
        log.Fatal("Could not connect to hydra chat system", err)
    }
    fmt.Println("Connected to hydra chat system")
    name += ":"
    defer conn.Close()
    go func() {
        scanner := bufio.NewScanner(conn)
        for scanner.Scan() {
            fmt.Println(scanner.Text())
        }
    }()
  • Next we will use net.Dial to connect to the chat server address in here .
  • I just have hard coded.
  • Because it's a just testing client.
  • And of course It is TCP.
  • And basically this code will allow this program to connect via TCP to the chat server which should be listening on this address.
  • Then this call will return a connection object and an error object.
  • If the error object is not nil, then we know something have gone wrong.
  • And we use log.Fatal to log an error message then exits.
  • Otherwise we'll get to this part of the code.
  • We print the fact that we are now connected to the hydra chat system.
  • Then we add a semicolon to the name.
  • To separate the person name from the message actually being sent.
  • Next thing we know is called for conn.Close() to ensure the connection is closed at the end.
  • Then we start a new go routine where we will listen to incoming messages from the TCP connection which we have open to the chat server.
  • So same as a earlier We use bufio.NewScanner to create a scanner which will then allow us to keep listening on a for loop to incoming messages till scanner.Scan would return false.
  • And then whenever scanner.Scan would get a message we will printed out on the standard output.
  • So that the user of this software will be able to see the incoming message from the chat room.

NewScanner

func NewScanner(r io.Reader) *Scanner {
    return &Scanner{
        r:            r,
        split:        ScanLines,
        maxTokenSize: MaxScanTokenSize,
    }
}
  • Creating a bufio.NewScanner on the TCP connection object was made possible because a new scanner takes on io.Reader type as an argument.

Conn in Net.go

type Conn interface {
    // Read reads data from the connection.
    // Read can be made to time out and return an error after a fixed
    // time limit; see SetDeadline and SetReadDeadline.
    Read(b []byte) (n int, err error)
    }
  • And at the same time the conn type implements the IO reader interface by taking care of the read method.
  • Again it shows us the power of go's interfaces.
  • Now we put this logic inside the go routine so that it will not block the code excution for our program.
scanner := bufio.NewScanner(os.Stdin) 
for scanner.Scan() && err == nil {
    msg := scanner.Text() 
    _, err = fmt.Fprint(conn, name+msg+"\n") 
}
  • That is because we still need to implement the piece of code that will handle a sending a message to the chat server.
  • And this is done by creating yet another scanner but this time on the standard input.
  • Now this scanner will allow our program to read messages left by the user of the software.
  • So never the user of the software would enter a message then hit enter.
  • We will get this ??????? of this message stored in a variable called message.
  • And then will use the power of fmt.Fprintf to write the contents of our message into the TCP connection.
  • Effectively sending this combined string to the chat server.
  • The combined string will be the combination of the name of the person sending the message.
  • And the actual contents of the message combined with an end of line character.
func Fprint(w io.Writer, a ...any) (n int, err error) { 
    p := newPrinter() 
    p.doPrint(a) 
    n, err = w.Write(p.buf) 
    p.free() 
    return 
}
  • fmt.Fprtintf usage was possible because conn implements the IO writer interface and the IO writer interface is accepted as the first argument Fprtintf.
  • And because of that This code is legal.

hchatservertest.go

func main() {
    hydrachat.Run("127.0.0.1:2300")
}
  • To simplify our code, Let's create another program that would act as our chat server.
  • So create another folder called hcahtservertest and in there.
  • Let's create hchatservertest.go file.
  • This will be a simple program, so to be of package main.
  • Main we will write the main function and all the main function needs to do is call the run function of our hydra chat server with the connection string.
  • So the run function belongs to the hydrachat package.
  • So let's try that if I do hydrachat here the package gets imported.
  • Then we will call Run.
  • And then Run should listen to the other end of the TCP channel that our client will try to connect to.
  • So this was on local port 2300.
  • So lets just do that.
  • Local port 2300.
  • So this should serve our needs.

hydrachat.go

    for {
        conn, err := l.Accept()
        if err != nil {
            logger.Println("Error accepting connection from chat client", err)
            break
        }
        go handleConnection(r, conn)
    }
  • And again because our run function is a blocking call thanks to this for loop then we know that our program will not exit.
  • As it's trying to accept new connections.
  • Now it's time to test out this code.
  • Now let's test our code.
  • So let's open up the folders here in the terminal.
  • So I opened one terminal for the server folder and opened three terminals for the client folders.
  • And with this we should be able to run one server and three chat clients.
  • So let's start with the server.

  • So again use awesome go run commands hchatservertest.
  • And we see no error messages.
  • Means our server is running.
  • Now let's run our clients.
  • So go run.
  • So should be chat client.
  • So as we've programmed our client to do is now asking us about our name.
  • So let's so for this session I'll call myself Mina.
  • Connecting now is connected.
  • Now let's use Jim now for the second chat client.
  • Jim is connected.
  • Finally lets go is maybe Caro third name.
  • So if everything works as expected, we should have three persons now in the Hydra system chat room.
  • So let's start our chat messages and say Hello this is Mina.
  • So " Hello this is Mina" got sent.

  • We see here the logging for our server says that this message was received.
  • And now if we look at Jim and Caro's sessions, we see Mina's message.
  • So both Jim and Caro received the message.
  • What if Jim sends a message

  • Hello Mina this is Jim enter.
  • Mina gets a message.
  • Caro gets a message.
  • And Jim sees his message.
  • Finally let's try out Caro.

  • Caro says this is Caro.
  • Everybody gets a message.
  • Our chat system works as expected.

Fan-in

An operation where data on multiple channels is accumulated into one channel for a final destination.

  • Now let's discuss Fan-ins and fan-outs which are very important to understand and the Go language in order to write powerful concurrent programs.
  • They are to Golang and concurrency design patterns that we can use in complex Go programs.
  • So let's start with the Fan-in.
  • Fan-in is basically the design pattern which involves taking a bunch of Go channels as an input.
  • And then combining their data into a single output channel which is then can be easily processed elsewhere in our code.
  • So again whenever we take multiple go channels, and then output a single channel that contains all their data we do a fan-in design pattern.
  • This is very useful when we have too many input channels.
  • And we want to process them in one place.
  • So it makes sense to consolidate them into a single output channel and then process this channel elsewhere.

fanin.go

package fanin

func fanin(chs ...<-chan int) <-chan int {
    out := make(chan int)
    for _, c := range chs {
        go registerChannel(c, out)
    }
    return out
}

func registerChannel(c <-chan int, out chan<- int) {
    for n := range c {
        out <- n
    }
}
  • So let's see here a quick example for piece of code that does fan-in design pattern.
  • So in here I created a sample package called Fanin put into a folder called fanin.
  • And then here there are two functions.
  • The first function is a fanin function which is where we do our actual faning in .
  • So the function takes multiple channels as arguments.
  • And then output a single channel which will contain the data we receive from any of the input channels.
  • So there's three dot initation??? here is called variadic function notation which means that this argument is actually a collection of arguments.
  • Each are of this type which is an end channel.
  • So the first thing we do here in our Fanin code is to create an output channel.
  • So we just make a channel of type event which is a type we selected for our sample code here.
  • And then we start out for range loop which will loop through our channel arguments.
  • And then for each channel we would register it with the output channel.
  • Let's see how that is done.
  • So we do that via the register channel function which we take an input channel.
  • And then output values from this input channel to an output channel simple.
  • And then we have a for range loop which will listen to data coming in via the C channel our input channel.
  • And then whenever we receive a value through this channel, we will output it to the out channel.
  • So when we register a channel for each of the provided input channel stored??? fanin function.
  • We will basically say that we want all values coming through these channels to show up on our output channel.
  • The reason why we use a go routine here is because this is a blocking call.
  • So we need to run this code concurrently on separate go routines for each one of our input channels.
  • And then at the end, we return our output channel.

Fan-out

An operation where data on one channel is distributed among multiple channels for concurrent processing

  • Now the opposite of the fan-in design pattern is a fan-out design pattern which is when we take data from a single input channel and then distribute this data amongst multiple output channels.
  • This is a powerful pattern to use when we need to distribute our data route which is coming through an input channel into multiple output channels where each piece of data will be processed individually on a separate channel maybe in a separate group routine.
  • This technique was used in our chat server in order to distribute the messages coming to our chat server to the different clients already connected to our chat server.

rooms.go

func (r *room) Run() {
    logger.Println("Starting chat room", r.name)
    go func() {
        for msg := range r.Msgch {
            r.broadcastMsg(msg)
        }
    }()
}

func (r *room) broadcastMsg(msg string) {
    r.RLock()
    defer r.RUnlock()
    fmt.Println("Received message:", msg)
    for wc, _ := range r.clients {
        go func(wc chan<- string) {
            wc <- msg
        }(wc)
    }
}
  • So the fan out design pattern was implemented in the rooms.go file here close to the end.
  • So it started here at the run method where we were listening to any data coming to our chat room main message channel.
  • And then whenever we extract a string message from the message channel we broadcasted via a broadcast message method into all that connected client channels via this for loop.
  • So loops through all the client channels one by one and for each client channel, we spin a new Go routine which will pass the message delivered from the message channel into the client channel.
  • So powerful design pattern

Summary

  • We finished our fun journey of flighting the Hydra chat system.
  • We now have full functional chat system that makes good use of go's features and design patterns.
  • We will tackle and new mastered topic in the go universe which is reflection.
  • We'll Discuss the three laws of reflection that we have to be fully aware of when using the power of reflection in the Go language.
728x90
반응형

관련글 더보기

댓글 영역