当前位置: 代码迷 >> 综合 >> 基于erlang gen_server 实现的tcp通讯
  详细解决方案

基于erlang gen_server 实现的tcp通讯

热度:48   发布时间:2024-02-23 08:59:38.0

1、前言

erlang 的socket编程都是基于两个自带系统模块,分别gen_tcp和gen_udp。本文先实现erlang tcp通信实例。
模拟一个业务场景:erlang目前广泛用于编写游戏服务器,每一个玩家连接进服务器之后就会启动一个玩家进程,在建立连接之后,客户端就可以与服务端进行数据交流,然后才可以进行各种用户操作,例如升级等等(详细的业务逻辑,实际上就是数据的管理然后修改玩家进程的state)。今天就来模拟一下,玩家通过tcp连接进服务器之后,创建一个玩家进程,客户端与该玩家进程进行数据通信的业务场景,每一个客户端都有一个对应的进程与其进行通讯。编写代码前首先需要了解用到了gen_tcp的哪些方法,如何建立连接。

2、实现思路

1、通过gen_tcp:listen(Port, Options). 来监听一个端口,所有连接都通过该端口号。Options是一个列表,可以选择监听参数,具体可以查看官方文档,常用的一般是
{reuseaddr, true}:允许本地重复使用端口号
{active, false}:设置套接字为被动模式。套接字收到的消息被缓存起来,进程必须通过调用函数 gen_tcp:recv/2 或 gen_tcp:recv/3 来读取这些消息。
{delay_send, true}:数据不是立即发送,而是存到发送队列里,等 socket 可写的时候再发送

2、在监听端口号之后就要监听连接,使用的接口就是gen_tcp:accept(ListenSocket)或者是prim_inet:async_accept(ListenSock, -1);这两者的差别就是同步和异步的差别,accept会阻塞进程,后者是异步执行。但是实际上两者都是一个一个执行,不会说异步就是可以一次处理两个链接。这里先实现accept,后面再去实现async_accept,实际上唯一的影响就是在等待的过程中,这个进程能否处理其他事务。
3、客户端请求链接。使用的是connect(Address, Port, Options),这里参数是IP地址,端口号,选项一般是[binary, {packet, 0}]。4个参数的话最后一个就是timeout值。
4、建立玩家进程与客户端的联系。使用gen_tcp:controlling_process(Socket, Pid)。为什么要转移呢,默认是会把客户端信息发给创建Socket的进程,而监听连接进程一旦调用accept就会阻塞,处理不了客户端发来的数据;即使使用异步的async_accept,一个进程既创建连接又进行所有用户的数据交流,也显然不妥。所以需要把Socket的“所有权”转交给玩家进程。
5、数据交流。一旦tcp连接建立之后,就可以调用gen_tcp:send(Socket, Data).来进行通讯
6、断开连接。gen_tcp:close(Socket).

整体实现思路就是在启动erlang节点的时候启动一个tcp_server进程用于监听连接,在init()的时候监听端口号,同时监听等待连接。然后一旦有客户端连接进来了,就把Socket作为参数,起动一个玩家进程,同时在state里面保存这个socket用于通信。tcp_server进程则继续调用gen_tcp:accept(),等待下一个玩家连接。tcp通信是以消息的形式发送给进程所以必须实现 handle_info({tcp, Socket, BinData}).

代码实现

tcp_server:

-module(tcp_server).%% API
-export([start_link/0
]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).-include("common.hrl").-record(state, {
    num = 0, socket}).-define(port, 5677).
-define(host, {
    192,168,0,0}). %% IPstart_link() ->gen_server:start_link({
    local, ?MODULE}, ?MODULE, [], []).%% ---------------------------------------------------
%% 回调函数
%% ---------------------------------------------------
init([]) ->{
    ok, Listen} = gen_tcp:listen(?port, [binary, {
    packet, 0}, {
    reuseaddr, true}, {
    active, true}]),accept(Listen),
%% {ok, _Socket} = gen_tcp:accept(Listen),
%% gen_tcp:close(Listen),State = #state{
    socket = Listen},{
    ok, State}.handle_call(_Request, _From, State) ->{
    noreply, State}.handle_cast(_Msg, State) ->{
    noreply, State}.handle_info({
    'EXIT', _, normal}, State) ->{
    noreply, State};
handle_info(_Info, State) ->{
    noreply, State}.terminate(_Reason, _State) ->ok.code_change(_OldVsn, State, _Extra) ->{
    ok, State}.accept(Listen) ->case gen_tcp:accept(Listen) of{
    ok, Socket} ->case tcp_role:start_link(Socket) of{
    ok, Pid} ->?INFO("============== ~w", [Pid]),case gen_tcp:controlling_process(Socket, Pid) ofok -> ok;_ -> okend;_ ->?INFO("转换失败"),okend;_ ->?INFO("监听失败 ===================="),okend,accept(Listen).

tcp_client

-module(tcp_client).
%% API
-export([start_link/0, connect/1, send/2
]).-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).-include("common.hrl").-define(port, 5677).
-define(host, {
    192,168,0,0}).-record(state, {
    socket}).start_link() ->gen_server:start_link(?MODULE, [], []).connect(Pid) ->gen_server:cast(Pid, {
    connect}),ok.send(Pid, Str) ->gen_server:cast(Pid, {
    send, Str}),ok.init([]) ->{
    ok, #state{
    }}.handle_call(_Request, _From, State) -> {
    noreply, State}.handle_cast({
    send, Str}, State) ->SendStr = erlang:term_to_binary(Str),?INFO("=============== ~w", [SendStr]),gen_tcp:send(State#state.socket, SendStr),{
    noreply, State};
handle_cast({
    connect}, State) ->SomeHostInNet = ?host,{
    ok, Socket} = gen_tcp:connect(SomeHostInNet, ?port, [binary, {
    packet, 0}]),{
    noreply, State#state{
    socket = Socket}};
handle_cast(_Request, State) -> {
    noreply, State}.handle_info({
    tcp, _Scoket, Data}, State) ->?INFO("客户端进程收到数据 ~ts", [Data]),{
    noreply, State};
handle_info(_Info, State) -> {
    noreply, State}.terminate(_Reason, _State) ->ok.
code_change(_OldVsn, State, _Extra) ->{
    ok, State}.

tcp_role

-module(tcp_role).
%% API
-export([start_link/1
]).-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).-include("common.hrl").-record(state, {
    socket}).start_link(Socket) ->gen_server:start_link(?MODULE, [Socket], []).init([Socket]) ->State = #state{
    socket = Socket},{
    ok, State}.handle_call(_Request, _From, State) -> {
    noreply, State}.handle_cast(_Request, State) -> {
    noreply, State}.handle_info({
    tcp, Socket, Data}, State) ->Str = binary_to_term(Data),?INFO("子进程收到数据 ~ts ~w", [Data, Str]),gen_tcp:send(Socket, ?T("我收到了")),{
    noreply, State};
handle_info(_Info, State) -> {
    noreply, State}.terminate(_Reason, _State) ->ok.
code_change(_OldVsn, State, _Extra) ->{
    ok, State}.

测试的时候是启动了2个erlang节点,一个erlang节点1启动tcp_server,另一个erlang节点2启动tcp_client。当连接的时候会在节点1启动一个tcp_role进程。

优化

1、tcp_server进程在启动init()的时候直接调用accept()这样会导致进程直接进入阻塞,连后面返回的state都没有执行,这里应该在init()的时候先listen(),然后向再向本进程发一条消息,起码等init完之后再去监听连接。self() ! {accept, ListenSocket}.然后在handle_info()的时候再去accept。
2、如果需要增加服务器的可靠性和稳定性,这里就必须增加监控树(supervisor)的概念。因为进程一旦崩溃(如果代码哪里报错或者其他奇奇怪怪的问题导致进程被kill掉),那么就连接不上服务器了。所以,加入监控树,一旦进程自然退出或者错误退出,都会重启该进程。这也是为什么erlang会比较稳定的原因,出现错误,进程崩掉就重启呗,又不影响其他进程,其他功能还是能继续执行。
3、如果再加一个需求,需要配置一个最大连接数。可以在每次gen_tcp:accept()返回成功之后,向进程字典存一个递增数,然后判断是否超过最大值,然后再判断是否close掉这个socket。
4、使用阻塞式的accept()会显得代码比较生硬,很多东西处理起来不太方便,比如需要向state里面存一些数据,那就得在accept()之后不能直接调用accept(),而是在一次连接完成之后向自身进程再发一条消息去accept()。

handle_info({
    accept, ListenSocket}, State) -> {
    ok, State1} = accept(State),self() ! {
    accept, ListenSocket},
{
    noreply, State1}.
accept(State) ->case gen_tcp:accept(State#state.socket) of{
    ok, Socket} ->case tcp_role:start_link(Socket) of{
    ok, Pid} ->?INFO("============== ~w", [Pid]),case gen_tcp:controlling_process(Socket, Pid) ofok -> ok;_ -> okend;_ ->?INFO("转换失败"),okend;_ ->?INFO("监听失败 ===================="),okend,{
    ok, State}.

假想一下:
erlang进程间传递信息数据是通过发送消息的,接受消息然后存放于消息队列。如果在执行accept()一个连接的时候,就已经有下一个连接的消息(消息A)存放于消息队列,而处理完accept()之后需要向自身发一个消息(消息B)才能继续accept,那这时消息A在消息B前面,就会出现进程没有监听连接,但是又需要连接,这时候是不是就会连接不上呢?

综合各种考虑,我觉得还是实现prim_inet:async_accept(LSock, -1)这种异步式监听比较靠谱,而且比较自由,进程不需阻塞,进程还可以处理其他问题。

关于异步的实现和假想问题的证实就下次再补充了。

本人erlang的开发经验大概有个大半年,第一次写博客,还真不容易,很多理解上的东西,还是没能用语言表达出来。如果代码有什么问题欢迎留言或者指出不足~~~~~~