読者です 読者をやめる 読者になる 読者になる

初老のボケ防止日記

おっさんのひとりごとだから気にしないようにな。

スポンサーリンク

いまからのGo(4)

Go言語


レリGo- レリGo-
本を読んでも実際に何か作ってみないと全く理解が進まないのでサンプルを組みながら理解していくコーナー其の4。

前回迄のあらすじ

osa030.hatenablog.com

前回はECHOクライアント側をパッケージ化したのだが、今回はビッグECHOサーバ側。

環境

前回通り。

OS Windwos 10 Pro
Go 1.6.2
IntelliJ IDEA 2016.2
Goプラグイン 0.12.1724
GB 0.4.x

構成

今迄の通り、IDEAのプロジェクトフォルダを「pachimongo」とする。

$ cd pachimongo/
$ tree -d
.
`-- src
    |-- cmd
    |   |-- client
    |   `-- server
    `-- common
        `-- net
            `-- echo

IDEAで補完を有効化するために「pachimongo/src」を「Go Libraries」に追加するのを忘れずに。

手順は以下参照

osa030.hatenablog.com

「common/net/echo」パッケージ

ECHOセッション

ECHOサーバに接続してきたECHOクライアントとのTCPコネクションを扱う為の構造体とメソッド。

  • EchoSession構造体
type EchoSession struct {
	id     string
	conn   net.Conn
	close  chan struct{}
	closed chan string
}

「id」は複数のセッションを一意に特定するためのもので、EchoServerクラスが利用する。「closed」はどのセッションが切断したかを特定できるようにstringに変更してidを通知するようにしている。

  • コンストラクタ
func newEchoSession(conn net.Conn, closed chan string) *EchoSession {
	return &EchoSession{
		id:     fmt.Sprintf("%s-%s", conn.RemoteAddr(), conn.LocalAddr()),
		conn:   conn,
		close:  make(chan struct{}, 1),
		closed: closed,
	}
}

で、コンストラクタ。IDはクライアント側の「ip:port」とサーバ側の「ip:port」にしているがサーバ側は同じだから要らない気もする。「closed」を引数渡しにしているのは理由があって、通知を受信する側(EchoServer側)で、複数のECHOセッションのチャネルを監視するのは少しコツがいるので、今回はEchoServer側で宣言したチャネルを複数のECHOセッション間で共有するという形にした。ここについてはまた今度記事をかけたらいいかなと思っている。

  • セッションメインルーチン
func (es *EchoSession) handle(wg *sync.WaitGroup) {
	defer func() {
		log.Println("disconnect from:", es.id)
		es.conn.Close()
		es.conn = nil
		es.closed <- es.id
		wg.Done()
	}()

MESSAGE_LOOP:
	for {
		select {
		case <-es.close:
			break MESSAGE_LOOP
		default:
			es.conn.SetReadDeadline(time.Now().Add(ReadTimeOut))
			msg, err := readMessage(es.conn)
			if err != nil {
				if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
					continue
				}
				log.Println("ERROR:", err)
				break MESSAGE_LOOP
			}
			log.Printf("RECV[%s]\n", msg)

			resp := strings.ToUpper(msg)

			err = writeMessage(es.conn, resp)
			if err != nil {
				log.Println("ERROR:", err)
				break MESSAGE_LOOP
			}
			log.Printf("SEND[%s]\n", resp)
		}
	}
}

これがECHOセッションのメインルーチン。やってることはTCPコネクション経由でクライアントから受信した文字列を大文字にして応答するだけ。チャネル経由したEchoServerとのやりとりは切断関連のみ。引数の「sync.WaitGroup」についてはEchoServer側で説明。

  • セッション切断
func (es *EchoSession) Close() {
	if es.conn != nil {
		es.close <- struct{}{}
	}
}

EchoServerの終了時に生存中のセッションを切断する為のメソッド。チャネルを通じてECHOセッションメインルーチンに切断を指示する。切断完了の待機がないのは今回はsync.WaitGroupを使っているからです。

  • session.go

全体を通した処理はコチラ。

package echo

import (
	"fmt"
	"log"
	"net"
	"strings"
	"sync"
	"time"
)

type EchoSession struct {
	id     string
	conn   net.Conn
	close  chan struct{}
	closed chan string
}

func newEchoSession(conn net.Conn, closed chan string) *EchoSession {
	return &EchoSession{
		id:     fmt.Sprintf("%s-%s", conn.RemoteAddr(), conn.LocalAddr()),
		conn:   conn,
		close:  make(chan struct{}, 1),
		closed: closed,
	}
}

func (es *EchoSession) handle(wg *sync.WaitGroup) {
	defer func() {
		log.Println("disconnect from:", es.id)
		es.conn.Close()
		es.conn = nil
		es.closed <- es.id
		wg.Done()
	}()

MESSAGE_LOOP:
	for {
		select {
		case <-es.close:
			break MESSAGE_LOOP
		default:
			es.conn.SetReadDeadline(time.Now().Add(ReadTimeOut))
			msg, err := readMessage(es.conn)
			if err != nil {
				if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
					continue
				}
				log.Println("ERROR:", err)
				break MESSAGE_LOOP
			}
			log.Printf("RECV[%s]\n", msg)

			resp := strings.ToUpper(msg)

			err = writeMessage(es.conn, resp)
			if err != nil {
				log.Println("ERROR:", err)
				break MESSAGE_LOOP
			}
			log.Printf("SEND[%s]\n", resp)
		}
	}
}

func (es *EchoSession) Close() {
	if es.conn != nil {
		es.close <- struct{}{}
		<- es.closed
	}
}

ビッグECHOサーバ

さていよいよビッグなECHOサーバさんです。

  • EchoServer構造体
type EchoServer struct {
	listener *net.TCPListener
	close    chan struct{}
	closed   chan struct{}
}

ECHOクライアントからの接続を待ち受けているListenerと、切断関連のチャネルのみ。シンプルである。

  • コンストラクタ
func NewEchoServer(service string) (*EchoServer, error) {

	tcpAddr, err := net.ResolveTCPAddr("tcp", service)
	if err != nil {
		return nil, err
	}

	tcpListener, err := net.ListenTCP("tcp", tcpAddr)
	if err != nil {
		return nil, err
	}

	return &EchoServer{
		listener: tcpListener,
		close:    make(chan struct{}, 1),
		closed:   make(chan struct{}, 1),
	}, nil
}

コンストラクタといいつつエラーも返却できるので、内部でTCP待受までしている。なお、今まではnet.Listenで待受していたのだけれも型アサーションしなくてもいいように今回はnet.ListenTCPを使っている。

  • サーバ起動
func (es *EchoServer) Start() {
	go es.handle()
}

メインルーチンをGoルーチンとして起動するだけの処理。コンストラクタ内でGoしてないのは気分です。

  • サーバメインルーチン
func (es *EchoServer) handle() {

	sessions := make(map[string]*EchoSession)
	wg := sync.WaitGroup{}
	closed := make(chan string, EchoSessionMax)

	defer func() {
		log.Println("close listener:", es.listener.Addr())
		es.listener.Close()
		for id, session := range sessions {
			log.Println("force close session:", id)
			session.Close()
		}
		wg.Wait()
		es.closed <- struct{}{}
	}()

ACCEPT_LOOP:
	for {
		select {
		case <-es.close:
			break ACCEPT_LOOP
		case id := <-closed:
			log.Println("session closed:", id)
			delete(sessions, id)

		default:
			es.listener.SetDeadline(time.Now().Add(AcceptTimeout))
			conn, err := es.listener.Accept()
			if err != nil {
				if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
					continue
				}

				log.Println("ERROR:", err)
				break ACCEPT_LOOP
			}

			session := newEchoSession(conn, closed)
			sessions[session.id] = session
			log.Println("new session:", session.id)
			wg.Add(1)
			go session.handle(&wg)
		}
	}
}

ECHOサーバのメインルーチン。といっても、新規のTCPコネクションはECHOセッションに委ねるので、処理自体はTCP接続を待ち受けるだけというシンプルさ。今回はmapを使ってセッションを管理することで、メインルーチン終了時に生存中のセッションは切断処理が行えるようにしている。
今回用いたsync.WaitGroupというのはJavaでいうCountDownLatchというやつで、Add()した数が0になるまでWait()がブロックする。EchoSessionのhandle()をGoルーチン起動する時にAdd()して、EchoClient.handle()が抜ける時にDone()でデクリメントしている。という訳で複数のGoルーチンの終了待ち合わせをひとつの変数で行える使える便利なものなのである。

sync - The Go Programming Language

  • サーバ停止
func (es *EchoServer) Stop() {

	es.close <- struct{}{}
	<-es.closed
}

TCP接続をクローズしてGoルーチン停止を待ち受けている。

  • server.go

全体を通した処理はコチラ。

package echo

import (
	"log"
	"net"
	"sync"
	"time"
)

const (
	EchoSessionMax = 1000
	AcceptTimeout  = time.Millisecond * 100
)

type EchoServer struct {
	listener *net.TCPListener
	close    chan struct{}
	closed   chan struct{}
}

func NewEchoServer(service string) (*EchoServer, error) {

	tcpAddr, err := net.ResolveTCPAddr("tcp", service)
	if err != nil {
		return nil, err
	}

	tcpListener, err := net.ListenTCP("tcp", tcpAddr)
	if err != nil {
		return nil, err
	}

	return &EchoServer{
		listener: tcpListener,
		close:    make(chan struct{}, 1),
		closed:   make(chan struct{}, 1),
	}, nil
}

func (es *EchoServer) handle() {

	sessions := make(map[string]*EchoSession)
	wg := sync.WaitGroup{}
	closed := make(chan string, EchoSessionMax)

	defer func() {
		log.Println("close listener:", es.listener.Addr())
		es.listener.Close()
		for id, session := range sessions {
			log.Println("force close session:", id)
			session.Close()
		}
		wg.Wait()
		es.closed <- struct{}{}
	}()

ACCEPT_LOOP:
	for {
		select {
		case <-es.close:
			break ACCEPT_LOOP
		case id := <-closed:
			log.Println("session closed:", id)
			delete(sessions, id)

		default:
			es.listener.SetDeadline(time.Now().Add(AcceptTimeout))
			conn, err := es.listener.Accept()
			if err != nil {
				if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
					continue
				}

				log.Println("ERROR:", err)
				break ACCEPT_LOOP
			}

			session := newEchoSession(conn, closed)
			sessions[session.id] = session
			log.Println("new session:", session.id)
			wg.Add(1)
			go session.handle(&wg)
		}
	}
}

func (es *EchoServer) Start() {
	go es.handle()
}

func (es *EchoServer) Stop() {

	es.close <- struct{}{}
	<-es.closed
}

ビッグECHOサーバ(main)

「common/net/echo」のEchoServerを用いたECHOサーバ。

package main

import (
	"common/net/echo"
	"log"
	"os"
	"os/signal"
)

func main() {

	service := ":5555"

	log.Println("start server", service)
	defer log.Println("stop server")

	echoServer, err := echo.NewEchoServer(service)
	if err != nil {
		log.Fatal(err)
	}

	echoServer.Start()
	defer echoServer.Stop()

	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt)
	<-sig
}

パッケージ側に処理を移行したのでスッキリサッパリしておりますが、os.Interruptで終了するまでECHOサーバが待受を続けてセッションもよしなに処理されるという訳です。

ということで

なんということでしょう。とても綺麗なコードになりました。

でもまだもう少し改善できそうな気がしている(続く)。

プログラミング言語Go (ADDISON-WESLEY PROFESSIONAL COMPUTING SERIES)

プログラミング言語Go (ADDISON-WESLEY PROFESSIONAL COMPUTING SERIES)

スターティングGo言語

スターティングGo言語

プログラミング言語Goフレーズブック

プログラミング言語Goフレーズブック

  • 作者: David Chisnall,デイビッド・チズナール,柴田芳樹
  • 出版社/メーカー: ピアソン桐原
  • 発売日: 2012/10/04
  • メディア: 単行本(ソフトカバー)
  • 購入: 1人 クリック: 5回
  • この商品を含むブログを見る

スポンサーリンク