1 概述

1.1 案例介绍

TCPGroupChat基于TCP协议实现了一个群聊系统的核心功能。程序启动后,会监听指定端口并持续等待客户端连接。每当有新客户端接入时,服务器会创建一个独立的线程专门用于接收该客户端发送的消息,并将这些消息暂存到一个无阻塞队列中。服务器主循环从队列中取出消息后,利用互斥锁保护成员列表,并向所有其他客户端广播该消息,同时自动清理掉线客户端连接。这种多线程加队列的设计实现了并发处理多个客户端的能力,确保了通信的实时性和稳定性。

客户端工作机制 客户端程序负责与服务器建立连接,并实现双向通信。一方面,它通过子线程持续监听来自服务器的消息,并将其解码后打印在控制台,以便用户查看;另一方面,主循环不断读取用户的键盘输入,并将每条输入内容发送给服务器,从而实现发送消息的功能。如果检测到网络异常或服务器断开连接,客户端会自动退出程序,以避免死循环和资源泄漏。这种设计保证了客户端能够稳定地收发数据,并及时响应连接状态变化。

TCPGroupChat体现了典型的C/S架构在网络通信中的应用,适用于需要集中管理、实时交互的场景,如即时通讯、在线协作等。服务器作为中心节点统一处理消息转发,既简化了客户端逻辑,又提升了系统的可扩展性。此外,使用多线程处理并发请求、结合同步机制保障数据一致性,是构建健壮网络服务的重要手段。本示例虽然为简易群聊系统,但其结构清晰、逻辑完整,对大家熟悉CodeArts IDE for Cangjie具备良好的教学价值和工程参考意义。

1.2 适用对象

  • 企业
  • 个人开发者
  • 高校学生

1.3 案例时间

本案例总时长预计40分钟。

1.4 案例流程

d071634d5777d6486d97c29f899a93c6.png

说明:

  1. 练习使用CodeArts IDE for Cangjie编辑器;
  2. 仓颉创建TCPGroupChat群聊并体验。

1.5 资源总览

资源名称 规格 单价(元) 时长(分钟)
华为开发者空间——云主机 鲲鹏通用计算增强型 kc2 | 4vCPUs | 8G | Ubuntu 免费 40

最新案例动态,请查阅 《迈出万物互联的一小步:仓颉版TCPGroupChat群聊实现》。小伙伴快来领取华为开发者空间,进入云主机桌面版实操吧!

2 运行测试环境准备

2.1 开发者空间配置

面向广大开发者群体,华为开发者空间提供一个随时访问的“开发桌面云主机”、丰富的“预配置工具集合”和灵活使用的“场景化资源池”,开发者开箱即用,快速体验华为根技术和资源。

领取云主机后可以直接进入华为开发者空间工作台界面,点击打开云主机 > 进入桌面连接云主机。

a1aae6ff53aac98855ef597dd6899967.png

552fc96c3b58a06e294e4a760ae719e3.PNG

2.2 创建仓颉程序

点击桌面CodeArts IDE for Cangjie,打开编辑器,点击新建工程,保持默认配置,点击创建

产物类型说明

  • executable,可执行文件;
  • static,静态库,是一组预先编译好的目标文件的集合;
  • dynamic,动态库,是一种在程序运行时才被加载到内存中的库文件,多个程序共享一个动态库副本,而不是像静态库那样每个程序都包含一份完整的副本。

28acbca9146a8a6aacbfdd4f6ac3791b.png

2.3 运行仓颉工程

创建完成后,打开src/main.cj,参考下面代码简单修改后,点击编辑器右上角运行按钮直接运行,终端窗口可以看到打印内容。

package demo
// 第一个仓颉程序
main(): Int64 {
    println("hello world")
    println("你好,仓颉!")
    return 0
}

(* 仓颉注释语法:// 符号之后写单行注释,也可以在一对 /* 和 */ 符号之间写多行注释)

d1e16b48f5f9620fafe9fc59ae62a367.png

到这里,我们第一个仓颉程序就运行成功啦!接下来我们继续探索仓颉语言。

3 仓颉实现TCPGroupChat群聊

3.1 创建聊天服务器

首先,我们要明确,想实现一个群聊服务器,那么服务器需要以下功能才能支持群聊:

1.需要应对可能出现的并发情况。

2.需要聊天界面进行输入输出

3.需要网络通信工具支持

4.群聊成员的信息需要同步更新处理

5.服务器需要实时中转群聊成员的消息

6.可以支持退出(关闭)群聊

当我们明确了我们的服务器应该具备的功能,那么我们就可以着手去写我们的服务器了。在CodeArts IDE for Cangjie中创建服务端源文件server.cj,下面对服务端代码拆分解释,完整server.cj代码再3.1章节末尾。

3.1.1 导入模块与全局配置
import std.collection.*
import std.collection.concurrent.*
import std.console.*
import std.socket.*
import std.sync.*
import std.time.Duration

const PORT: UInt16 = 24621
const BUFFER_SIZE = 1024
var g_quit = false

其中,collection.*为仓颉的基础集合库,collection.concurrent.*为并发安全所需要的集合库,std.console.*库用于支持控制台输入输出。std.socket.*模块用于支持网络通信,具备TcpSocket等工具。std.sync.*和std.time.Duration分别用于同步和时间处理。通过导入以上模块,我们导入了网络通信、线程同步的能力。

我们定义服务监听端口为24621,接收消息缓冲区大小为1024,可以修改,但是需要权衡内存与消息长度。并且定义全局退出标志g_quit = false,通过简单布尔值控制多线程退出。

3.1.2 控制台退出指令监听
func controller() {
    spawn {
        while (!g_quit) {
            let key = Console.stdIn.read().getOrDefault {r' '}
            if (key == r'q' || key == r'Q') {
                g_quit = true
            }
        }
    }
}

我们通过定义一个controller函数来控制退出行为。其中,通过while循环,后台线程持续监听用户输入,输入q/Q时终止服务。

通过spawn创建轻量级协程(划重点,后面要考),避免阻塞主线程。

3.1.3 TCP Socket扩展方法

在正常的网络通信中,我们不能保证通信连接时时刻刻都是理想状态,网络通信中连接可能随时中断,直接调用write可能导致程序崩溃,所以我们需要给TcpSocket添加扩展:

extend TcpSocket {
    public func tryWrite(data: Array<Byte>) {
        try { 
            this.write(data)  
        } catch (e: SocketException) {
            if (!this.isClosed())  { this.close()  }
            return false
        }
        return true
    }
} 

为了避免未处理异常导致程序崩溃,我们在TcpSocket扩展方法中对原生write方法进行包装,捕获 SocketException 异常(如连接中断、超时等),同时通过(if (!this.isClosed()) { this.close() }),当写入失败时主动关闭无效连接,防止后续无效操作占用资源。

3.1.4 主程序模块

首先,在主程序main()中进行初始化和资源创建

    let messages = NonBlockingQueue<(TcpSocket, Array<Byte>)>()// 线程安全消息队列
    let members = LinkedList<TcpSocket>() // 群聊成员-客户端
    let server = TcpServerSocket(bindAt: PORT) // 服务端
    server.bind()// 绑定端口
    controller()// 启动控制台监听

通过NonBlockingQueue无锁设计提升并发性能,存储待转发的messages(客户端, 消息)元组。通过LinkedList存储所有客户端连接。

之后我们构建客户端连接处理线程

// 和新的客户端建立连接
    let mutex = ReentrantMutex()
    let monitor = Monitor()
    spawn {
        while (!g_quit) {
            let client = server.accept()// 阻塞等待新连接
            synchronized (mutex) {
                members.append(client)// 线程安全添加成员
            }
            // 每个连接要处理的任务
            spawn {// 为每个客户端创建消息接收线程
                while (!g_quit) {
                    let data = Array<Byte>(BUFFER_SIZE, item: 0)
                    // 初始化缓冲区
                    try { client.read(data) } catch (e: SocketException) {
                        return // 遇到异常终止此线程,由转发线程清理资源
                    }
                    println(String.fromUtf8(data))
                    // 服务端日志打印
                    messages.enqueue((client, data))
                    // 消息入队
                    synchronized (monitor) { monitor.notify() }
                    // 唤醒转发线程
                }
            }
        }
    }

在这里我们为了要实现连接对应线程,需要通过双重spawn。同时,为了进一步优化效率,我们利用monitor.notify()通知转发线程有新消息,避免忙等待,进一步实现CPU优化。并且当捕捉到异常时,终止线程,由转发线程清理资源。

我们现在进行下一步,搭建消息转发以及连接维护模块,将接收到的消息广播给所有在线客户端,同时动态维护有效的客户端连接池。

// 转发消息到所有客户端,实现群聊
    while (!g_quit) {
        // 此处 Monitor 仅用于避免 while 空转,可以注释掉以下三行代码对比进程 CPU 消耗情况
        synchronized (monitor) {
            monitor.wait(timeout: Duration.millisecond * 100)            
        }
        while (let Some((client, data)) <- messages.dequeue()) {
            mutex.lock()
            // 遍历群聊成员,转发消息并清理无效客户端
            members.removeIf { socket =>
                if (refEq(client, socket)) { // 不必转发给消息源
                    return false
                }
                !socket.tryWrite(data)
            }
            mutex.unlock()
        }
    }

首先是外层循环控制 (while (!g_quit)),监听全局退出标志 g_quit。但是考虑到持续监听会有资源浪费,我们最理想的状态是当消息传过来时进行监听,其余时间不进行监听,所以我们可以使用Monitor 同步等待 (synchronized (monitor)),当消息队列为空时,通过 wait 让出CPU资源,降低无意义循环的消耗,避免cpu空转。接下来,我们需要完成消息转发部分,我们通过messages.dequeue() 从线程安全队列获取待转发消息同时为了确保模式匹配类型安全,我们使用Some((client, data))。为了保证对 members 链表的操作(遍历/修改)线程安全,避免并发修改异常,我们需要引用互斥锁mutex.lock() 。为了避免无效转发,进行选择性广播,在这里我们使用refEq(client, socket) 跳过消息发送者自身(避免回显)。通过tryWrite(data) 尝试写入,失败返回 false 触发移除逻辑。为了清理失效连接,removeIf 在遍历期间直接移除写入失败的客户端,保持连接池健康。

最后,我们来构建最后一个退出模块。

// 退出时通知所有客户端
    for (client in members) {
        if (client.tryWrite("Server Exit".toArray())) {
            client.close()
        }
    }
    server.close()

最后,我们来构建最后一个退出模块。通过for (client in members)

遍历客户端连接。通过client.tryWrite(“Server Exit”.toArray())发送退出通知。其中的tryWrite扩展方法可以确保写入安全。写入成功之后执行 close(),避免重复关闭已失效的连接。最后关闭服务器server.close()。好了,现在我们拥有了一个完整的客户端服务器:

// 群聊程序-服务器完整代码

// 群聊程序-服务器完整代码
import std.collection.*
import std.collection.concurrent.*
import std.console.*
import std.socket.*
import std.sync.*
import std.time.Duration

const PORT: UInt16 = 24621
const BUFFER_SIZE = 1024
var g_quit = false

// 从控制台读取退出命令
func controller() {
    spawn {
        while (!g_quit) {
            let key = Console.stdIn.read().getOrDefault {r' '}
            if (key == r'q' || key == r'Q') {
                g_quit = true
            }
        }
    }
}

extend TcpSocket {
    public func tryWrite(data: Array<Byte>) {
        try { this.write(data) } catch (e: SocketException) {
            if (!this.isClosed()) { this.close() }
            return false
        }
        return true
    }
}

main() {
    let messages = NonBlockingQueue<(TcpSocket, Array<Byte>)>()
    let members = LinkedList<TcpSocket>() // 群聊成员-客户端
    let server = TcpServerSocket(bindAt: PORT) // 服务端
    server.bind()
    controller()

    // 和新的客户端建立连接
    let mutex = ReentrantMutex()
    let monitor = Monitor()
    spawn {
        while (!g_quit) {
            let client = server.accept()
            synchronized (mutex) {
                members.append(client)
            }
            // 每个连接要处理的任务
            spawn {
                while (!g_quit) {
                    let data = Array<Byte>(BUFFER_SIZE, item: 0)
                    try { client.read(data) } catch (e: SocketException) {
                        return // 遇到异常终止此线程,由转发线程清理资源
                    }
                    println(String.fromUtf8(data))
                    messages.enqueue((client, data))
                    synchronized (monitor) { monitor.notify() }
                }
            }
        }
    }

    // 转发消息到所有客户端,实现群聊
    while (!g_quit) {
        // 此处 Monitor 仅用于避免 while 空转,可以注释掉以下三行代码对比进程 CPU 消耗情况
        synchronized (monitor) {
            monitor.wait(timeout: Duration.millisecond * 100)            
        }
        while (let Some((client, data)) <- messages.dequeue()) {
            mutex.lock()
            // 遍历群聊成员,转发消息并清理无效客户端
            members.removeIf { socket =>
                if (refEq(client, socket)) { // 不必转发给消息源
                    return false
                }
                !socket.tryWrite(data)
            }
            mutex.unlock()
        }
    }
    // 退出时通知所有客户端
    for (client in members) {
        if (client.tryWrite("Server Exit".toArray())) {
            client.close()
        }
    }
    server.close()
}

3.2 创建群聊客户端

既然我们已经实现了一个群聊服务器,那么同样,客户端也需要以下功能才能支持群聊:

1. 网络通信能力。

2. 需要聊天界面进行输入输出

3. 需要与服务器能够进行连接

4. 能够有退出群聊的能力

在CodeArts IDE for Cangjie中创建服务端源文件client.cj,下面对客户端代码拆分解释,完整client.cj代码再3.2章节末尾。

3.2.1 导入与常量定义
import std.socket.*
import std.console.*

const IP = "127.0.0.1"// 服务器地址(本地回环)
const PORT: UInt16 = 24621// 与服务端一致的端口
const BUFFER_SIZE = 1024// 接收缓冲区大小
var g_quit = false// 全局退出标志

其中,std.console.*库用于支持控制台输入输出。std.socket.*模块用于支持网络通信,具备TcpSocket等工具。通过导入以上模块,我们导入了网络通信、控制台交互的能力。

同时我们也定义IP和端口,与服务器保持一致。

3.2.2 主程序模块

先建立socket连接

let socket = TcpSocket(IP, PORT)
socket.connect()

在客户端中,需要读取服务器同步发来的信息。

// 读取服务器发来的消息
    spawn {
        while (!g_quit) {
            let data = Array<Byte>(BUFFER_SIZE, item: 0)
            var count = 0
            // 如果对端正常关闭连接,不会抛出异常,read 返回 0
            try { count = socket.read(data) } catch (e: SocketException) {
                g_quit = true
            }
            if (g_quit || count == 0) { break }
            println(String.fromUtf8(data))
        }
}

在这个功能中,我们需要先完成线程分离,即通过spawn将接收逻辑与用户输入解耦,避免阻塞主线程。再通过捕获SocketException并设置退出标志,防止线程僵死,完成异常处理模块。同时通过count == 0进行连接关闭检测。

消息发送模块

// 向服务器发送消息
    while (!g_quit) {
        let input = Console.stdIn.readln().getOrDefault {"\n"}
        try { socket.write(input.toArray()) } catch (e: SocketException) {
            g_quit = true
        }
    }
    socket.close()

在消息发送模块中,通过while (!g_quit)持续监听用户输入,

首先通过readln获取控制台输入,getOrDefault提供默认值避免空指针,完成用户输入部分。为了实现同步发送数据我们需要使用socket.write(),,若连接已断开会抛出 SocketException,同时设置 g_quit = true,触发后续资源释放。最后通过socket.close()完成资源释放。

好啦,我们也完成了客户端的构建,可以尝试我们的群聊服务器啦!

// 群聊程序-客户端完整代码
import std.socket.*
import std.console.*

const IP = "127.0.0.1"
const PORT: UInt16 = 24621
const BUFFER_SIZE = 1024
var g_quit = false

main() {
    let socket = TcpSocket(IP, PORT)
    socket.connect()
    // 读取服务器发来的消息
    spawn {
        while (!g_quit) {
            let data = Array<Byte>(BUFFER_SIZE, item: 0)
            var count = 0
            // 如果对端正常关闭连接,不会抛出异常,read 返回 0
            try { count = socket.read(data) } catch (e: SocketException) {
                g_quit = true
            }
            if (g_quit || count == 0) { break }
            println(String.fromUtf8(data))
        }
    }
    // 向服务器发送消息
    while (!g_quit) {
        let input = Console.stdIn.readln().getOrDefault {"\n"}
        try { socket.write(input.toArray()) } catch (e: SocketException) {
            g_quit = true
        }
    }
    socket.close()
}

3.3 启动群聊

使用CodeArts IDE for Cangjie完成构建客户端程序client.cj和服务器程序server.cj后,右键server.cj选择“打开所在的文件夹”进入项目目录:

2e59588c639399b188dcf368fd4ae8c7.png

在项目目录右键选择“Open Terminal Here”进入终端:

5669b712858c83de91ac02a88470b194.png

(* 注意:项目目录已个人实际项目目录为准)

对client.cj和server.cj进行编译:

cjc client.cj -o client.exe

cjc server.cj -o server.exe

49d05779496e96493b123007d3b7d00c.png

0365164549939350b26cf221cc13df6f.png

启动客户端和服务端的过程中,必须首先启动服务端,如果首先启动客户端,则会报错:

19ecd5bda59d6f50cda715ba887f1232.png

先启动服务端:

b759dbb2b172b708ffdeca7ac55a29a8.png

如果多开服务端则会提示报错:

e67f146fe4deb6a72c237c2c175a05c2.png

新打开一个终端,启动客户端:

94401085215ebf697e906c78dafb979e.png

但可以多开客户端,组成群聊成员:

e6ec34b0f5b05a1d546d6a9370c3625b.png

查看针对协程进行优化的代码部分,即Monitor 同步等待 (synchronized (monitor)):

7fb354147a492e5e4369392ec81422d9.png

删去这三段代码之前,不进行通信时,CPU 99.3%空闲,CPU负载非常低。

97878996d1a2cb8503a5a4b5a3ada4ca.png

通信的时候,负载升高,CPU 47.0%空闲,CPU负载较高

e98bc86ba0483a070077e6da8f5f3ddb.png

删去此三段代码之后,重新创建群聊。不进行通信时,CPU 0%空闲,CPU负载几乎占满。

0a34b5136305844d66cbb467acf68e3c.png

通信过程演示客户端发送信息:

7f74ddfb8e2aea732c0c8a87f5b9d492.png

消息首先会从客户端发送到服务端,服务端显示客户端发送的消息:

ebe5ec7d47ca6769e28ecf41e6e8f51c.png

服务端会将消息分发到处于群聊中的所有客户端,此时客户端二号和客户端三号也都受到了客户端一号发送的消息:

b6fcd2043f8e2346e85d7ebb750ffcd5.png

当单独在服务端发送信息时:

d0c7bc39679e1f3fafffd824eba22272.png

但是因为没有通过客户端发送到服务端,所以其他客户端都没有收到信息:

917ba582947c7ffb228ef95832e547fa.png

退出通信,通过在服务端发送q或Q来终止群聊:

c54acd7bb5c5970477edca7d1341228e.png

此时客户端显示服务退出

a071e533d5afe630b1ee0578d84dfedb.png

此时所有端口均退出群聊

1f757404c56219b5b44ad75c34eda01f.png

此刻,您已掌握仓颉TCPGroupChat的核心功能,它将成为您探索仓颉世界的桥梁,助力探索仓颉语言的无限可能。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐