您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12167|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;) |: ^  i8 Y9 p
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    8 a2 V) w7 o/ d6 m, @5 \

  3. 3 A5 E% V( \# U& {) U
  4. $worker = new Worker();9 D" J2 s( V/ P; d6 A
  5. // 4个进程9 l$ {$ ~; U2 H2 v0 z
  6. $worker->count = 4;) J5 {. l. v/ \% _" T8 m
  7. // 每个进程启动后在当前进程新增一个Worker监听
    # [1 ?3 r2 ]# l5 s/ V; W
  8. $worker->onWorkerStart = function($worker)
    8 l6 w7 [' ?+ }3 [# _
  9. {
    % {) J7 a) E  c
  10.     /**
    8 H$ i6 S) Q4 _: J
  11.      * 4个进程启动的时候都创建2016端口的Worker& X7 P$ [4 _# l9 ~2 ?' L4 L3 O
  12.      * 当执行到worker->listen()时会报Address already in use错误
    7 Q( M( \. o% |6 ~' J5 i1 W
  13.      * 如果worker->count=1则不会报错+ W' c, N! X1 }1 l1 k6 v8 P
  14.      */
    6 ~1 d9 v& ]: l! p5 c& K2 t
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');0 p+ h; V3 k3 Q' @
  16.     $inner_worker->onMessage = 'on_message';
    , }# Z7 Q# X, b/ Y
  17.     // 执行监听。这里会报Address already in use错误( F6 O0 H+ ?+ H* w) O$ `: {, r9 P
  18.     $inner_worker->listen();6 b4 I9 i2 o" _
  19. };7 q8 M" S$ x/ B. X& R2 n$ v& L+ B* @
  20. 2 j' u" ~0 e% O: p( s5 Q( g( l
  21. $worker->onMessage = 'on_message';* j6 L6 v+ A1 y4 @1 J$ |: G
  22. : Y' B0 p  i+ v; Z" x
  23. function on_message($connection, $data)
    * L' V8 j+ |. w2 h( ~
  24. {  o# W( ^+ m! y: @# f' v8 J& V2 x  E+ x8 s
  25.     $connection->send("hello\n");6 N% l* v& a; p, Z8 f; C' }) |
  26. }
    % U3 ]9 D& `* _  n0 v/ t9 D# S
  27. ! [7 U* M* |' l
  28. // 运行worker
    . A) Q2 K  @! b- M6 r
  29. Worker::runAll();
    0 I  u, ~) B. |7 m, @8 V
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:1 f3 W4 u6 D9 y1 _8 d
  31. 1 X7 I6 W: F" U/ q' ~
  32. use Workerman\Worker;
    . U2 @! l! K' U5 ~- w* H  W
  33. require_once './Workerman/Autoloader.php';1 z' S" A/ O7 S2 \$ t' j1 i# h7 m
  34. 5 y0 N$ J' K3 s6 u. x+ D
  35. $worker = new Worker('text://0.0.0.0:2015');* a: |9 D& l$ ^, n3 E7 D' ~  u
  36. // 4个进程  i( S5 w# \$ i- C* A6 t8 g; }( a
  37. $worker->count = 4;( D$ z& T6 J  I) |& n; W
  38. // 每个进程启动后在当前进程新增一个Worker监听& y- b. o6 t1 K# l9 B
  39. $worker->onWorkerStart = function($worker)
    " r) Q: @9 a( }# n0 b8 i/ _) f
  40. {
    . F4 s+ Q) y8 `- t% o
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');3 V8 X# _8 ^2 G0 k/ z
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)6 e! U! q" B  o: h6 c: F
  43.     $inner_worker->reusePort = true;
    ( w' @0 D# \" f8 F& i+ j" {# i
  44.     $inner_worker->onMessage = 'on_message';
    * h% f& }9 X) z+ o7 H, G" Y8 Q, p
  45.     // 执行监听。正常监听不会报错6 y7 u( N% g' q; P; ]7 E4 h% E
  46.     $inner_worker->listen();
    $ A/ l$ d- R7 d4 U9 y
  47. };0 g$ b! X. _& \. N8 ?2 A$ v
  48. 4 e" T! v! `6 T2 g: b- ^
  49. $worker->onMessage = 'on_message';
    4 r; _8 c) p. h

  50. " J1 J$ R1 M  Q
  51. function on_message($connection, $data), P) a! }" w* ?7 R; q% ?) m2 s  o
  52. {
    % N% E7 |; A/ k6 h) o
  53.     $connection->send("hello\n");* w3 @1 |+ L; m; l* l
  54. }
    ) Z1 ~2 z2 u5 K! [% |% P! {% K! p1 ]+ G
  55. 6 N2 R7 k. |5 u- G1 _9 t% w- z
  56. // 运行worker
    3 I4 J7 W) n" k# M
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php: h2 {$ `' V8 P7 l% Y
  2. use Workerman\Worker;
    4 ~! m1 K) s0 B8 Y, R& ]' j5 P
  3. require_once './Workerman/Autoloader.php';6 U& n# D5 N$ {$ w" T  x
  4. // 初始化一个worker容器,监听1234端口
    , W9 L0 N7 t) {$ m8 h8 J0 x
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    6 O' \! ~* N5 W' K: d6 c6 D

  6. ; `! j( k8 D* H2 J+ \
  7. /** ]! y* e" k2 e4 f' t7 K7 e
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    # m3 q) w. O: C# R4 t
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    7 ~5 _( J5 [4 S: Q
  10. */
    ( n. }! s/ s+ V1 |
  11. $worker->count = 1;
    , E3 [7 W1 a: \% o7 \. W
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口, k& ]) {) f0 s5 D- C0 K. j9 g' \
  13. $worker->onWorkerStart = function($worker)
    - S* u, U; f# }5 p7 a$ G
  14. {
    + Z0 F% E8 }7 n9 r
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符/ {' q8 w5 X  c% ]- p3 ~
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');1 |6 i% E: `& H( B& N, ]' `  v' q
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    1 A2 `% ]1 \, x+ [' O$ |7 B* o
  18.     {
    * e2 l) U2 c& G
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据, {; d" O6 C- V3 u$ A- g. z
  20.         $data = json_decode($buffer, true);, G; D5 a6 @  ?6 g
  21.         $uid = $data['uid'];) u6 H; W7 c$ c1 ~( `* i
  22.         // 通过workerman,向uid的页面推送数据% l3 \4 V# g+ ]& ^+ t. t! h/ o0 H! G4 @
  23.         $ret = sendMessageByUid($uid, $buffer);$ U5 T5 U4 m3 o! _
  24.         // 返回推送结果
    , \5 {% k$ G" Y* S$ H
  25.         $connection->send($ret ? 'ok' : 'fail');' \* o8 v4 x! {* ]6 G
  26.     };
      d6 B; J1 r; e" Q% @; G- ]$ Y
  27.     // ## 执行监听 ##! S8 o6 M7 Z9 k& {7 H: y8 o! f2 Y# q. X
  28.     $inner_text_worker->listen();5 Y: b& T3 E5 o9 s
  29. };+ R* Z) T2 d- t* ]! r+ L, ]- y
  30. // 新增加一个属性,用来保存uid到connection的映射
    ( K, [5 d! ^/ P( T! Z' l
  31. $worker->uidConnections = array();2 t7 t' Q/ U* g. j
  32. // 当有客户端发来消息时执行的回调函数. W# d* Q) E9 I& N1 x! Y
  33. $worker->onMessage = function($connection, $data)
    ( A; L& F' x  S3 H, M6 y0 C
  34. {
    " u/ e8 Z# j, i& J
  35.     global $worker;5 X0 z4 R- Y% `6 G) w( I! s
  36.     // 判断当前客户端是否已经验证,既是否设置了uid3 w4 k* T) X6 |8 @  ]( |: o) R- {4 M
  37.     if(!isset($connection->uid))
    ) j# K. _+ P3 V& f% z
  38.     {
    / k2 r/ ?0 l6 r0 P
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    % h; F0 [  R/ P/ N  Q9 I9 U
  40.        $connection->uid = $data;8 N" q7 m' l9 E
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,! g0 m2 c9 ~: O+ n; X: ^
  42.         * 实现针对特定uid推送数据
    3 A& V, d" u2 X8 m; o% u
  43.         */
    ( d; G, j4 Y" W6 ~, F7 r0 u5 f% W
  44.        $worker->uidConnections[$connection->uid] = $connection;0 h/ R9 S" @2 ]7 [8 ?- `5 N
  45.        return;
    : m+ X( d* S/ {9 K
  46.     }
    / ~( C# K* [4 Z# ~! l
  47. };; I5 P$ ?4 V$ G3 |

  48. 6 \1 n" z& o  h+ T" f, s( M/ N9 s$ d
  49. // 当有客户端连接断开时. F" G8 @* `- R! |. t+ S9 X
  50. $worker->onClose = function($connection)
    7 D1 |+ ]9 M6 B5 F2 L) v
  51. {
    5 l" N3 w: L* E  E6 \7 i& y  @: A
  52.     global $worker;# g& s; z$ I$ s0 |: e4 |
  53.     if(isset($connection->uid))) u' F" ^1 s" j' a6 f
  54.     {6 U0 U1 x4 L+ {* d  a1 s
  55.         // 连接断开时删除映射; j( `1 F" |$ f, Z
  56.         unset($worker->uidConnections[$connection->uid]);
    ) u8 g) Y: F" r- m# F9 r
  57.     }
    : ]% _: Y; D& w: |
  58. };
    8 q* i# L5 i' H4 \4 D* U

  59. ) }9 n; E% ~6 E$ V% Q
  60. // 向所有验证的用户推送数据: y/ k+ }; |  n5 N- a
  61. function broadcast($message)$ y# U% i1 h3 {/ ^2 T0 l! k, i# X: Y( t
  62. {
    ) o- D5 A, s% Z, b% ~$ t) |
  63.    global $worker;
    1 Q# r" B! ?6 x/ _$ H
  64.    foreach($worker->uidConnections as $connection)4 n% s% u9 g6 n4 `7 ?
  65.    {; u9 W! K/ E5 V: R: f+ Y+ E
  66.         $connection->send($message);2 Y4 i: [6 K" }1 v
  67.    }
    ' Q) a8 N/ n# B) o9 j" K: z
  68. }% D6 T9 U4 k6 b8 O: s
  69. " p3 G% M2 t; M6 c& t0 |8 P4 ]" ^
  70. // 针对uid推送数据
    / p7 H! Z' e( y% c* c- P" D0 P. J
  71. function sendMessageByUid($uid, $message)
    % U  A) t, ?) v0 R3 E) Q: ]
  72. {6 T: L4 ~% p3 ?/ M! M
  73.     global $worker;+ }4 D& |: D/ E+ R% \0 Q2 C
  74.     if(isset($worker->uidConnections[$uid]))
    1 g9 d' }- z0 j" U  x, k
  75.     {
    / H6 y; \. k# D& n: Q5 U
  76.         $connection = $worker->uidConnections[$uid];+ M0 ~1 f1 r- J# }
  77.         $connection->send($message);7 d/ v/ B+ T1 _- P
  78.         return true;6 y. K! X3 s2 p! P1 f8 F
  79.     }+ F' N( Y2 U- e+ k: w
  80.     return false;
    ' D+ |' ^  w/ J: L, J3 R9 S0 H9 p
  81. }$ u  m" ]$ w5 `) o+ c2 t+ u

  82. $ v6 z8 L  h6 U
  83. // 运行所有的worker+ F4 l% ]  O, C( J
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');  S" m" X: o# P9 G+ X* f8 y
  2. ws.onopen = function(){& Z! r, x/ o" t3 O7 d1 N
  3.     var uid = 'uid1';% k5 Q, I% R6 t# i8 L6 P
  4.     ws.send(uid);
    3 B2 d: `2 O  O% J$ h
  5. };; I2 a6 w0 S# h% m1 u! g9 Y. v/ g7 p
  6. ws.onmessage = function(e){
    * A1 V" T. z' e
  7.     alert(e.data);
    * I' D1 a. c' W7 G- T) C! C# W
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口# z& h: ?. C( t* F
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    3 {. s. e5 M; h3 L& u
  3. // 推送的数据,包含uid字段,表示是给这个uid推送  }6 S6 P3 P- ^0 S% L; H1 M
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    ) o/ G7 ^: T/ D2 j  v: q$ m0 h
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    " X, A( m2 ^5 \: Q
  6. fwrite($client, json_encode($data)."\n");
    7 J1 D& l4 z, T/ ], `4 K
  7. // 读取推送结果
    5 S4 U; d: x, O( J" j3 w/ P
  8. echo fread($client, 8192);
复制代码

% X0 D5 l& S  q* a$ N' M( B: D. L, l
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 20:31 , Processed in 0.105046 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!