您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12179|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    , T! N: _2 k. G- Q: b9 g; Z
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    7 w2 p' i% n! y0 D' \' q& N

  3. 2 {' r2 w* Q. P" e- n; a% s7 Y" ^& C
  4. $worker = new Worker();6 U# h2 d. _1 r8 Z
  5. // 4个进程
    2 N3 d9 ?( Q3 p* s) [' p
  6. $worker->count = 4;
    , D6 \, e# T/ Q: D( V% u1 C
  7. // 每个进程启动后在当前进程新增一个Worker监听
    7 B% E- T- o0 d* d+ P$ O4 t
  8. $worker->onWorkerStart = function($worker)
    ; m$ f$ }9 H. r5 a! }6 V2 x
  9. {
    " T' Y, v9 L  |6 P7 z
  10.     /**# }. Q" ]( n2 |8 k
  11.      * 4个进程启动的时候都创建2016端口的Worker: a  `- D. V+ s& l
  12.      * 当执行到worker->listen()时会报Address already in use错误, x# O) R5 W+ L1 M+ r0 W* ~) [. t
  13.      * 如果worker->count=1则不会报错
    6 c/ s3 [! }9 f0 [  Y/ D3 B
  14.      */* i5 z8 C: \- J4 c' z2 b! N9 B7 v6 V
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');# J9 {0 s, \% R; d& |) a- Z& G
  16.     $inner_worker->onMessage = 'on_message';
    ( n: e! y1 \0 F% c; ~& Y
  17.     // 执行监听。这里会报Address already in use错误
    0 z3 J5 O) e6 O' |' l% s
  18.     $inner_worker->listen();
    ' y7 p+ o! |0 h) O
  19. };
    2 I( t+ E& C( e. P1 h; m3 p9 b
  20. 3 h* T! U' H7 c
  21. $worker->onMessage = 'on_message';; d' h0 K: F1 o# l% ?1 k1 _
  22. % ?4 E/ x' u& J% E
  23. function on_message($connection, $data)
    & v0 Y8 R& b- |0 z# Z
  24. {
    : m  o4 p9 j+ X6 g1 E0 K
  25.     $connection->send("hello\n");
    * ~! s# F' I8 ]) ?# _9 j
  26. }* J! n) x; t( t. n
  27. % A' z2 `$ E2 C
  28. // 运行worker
    " f( e# G7 L$ q. M
  29. Worker::runAll();
    / m: g7 ~& r2 l8 B% e
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:" e* J9 O4 ]( x& e: p& p" Q

  31. 2 ~6 R# n8 Z8 b9 x; {
  32. use Workerman\Worker;
    0 c4 e" w  D3 s2 D( m0 }: G: g, }. M
  33. require_once './Workerman/Autoloader.php';
    $ x  z3 [  n! y: u+ Q/ S

  34. # [6 a/ R+ z. s# t
  35. $worker = new Worker('text://0.0.0.0:2015');6 c" J/ K6 g0 Y' W% {
  36. // 4个进程
    9 @$ X" K, @! [# P; D
  37. $worker->count = 4;+ f. g/ [5 h8 i! W$ G
  38. // 每个进程启动后在当前进程新增一个Worker监听+ p, Y7 q" L4 X; @5 c
  39. $worker->onWorkerStart = function($worker), w, [  k' u" G) g
  40. {
    ' p, g: f5 b3 w! d
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');. \" ~8 k0 u4 ?7 N8 ^) G( u
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0); Z# w, {9 F% |1 G- r
  43.     $inner_worker->reusePort = true;
    % Z) N4 R4 q) u# n6 i7 w
  44.     $inner_worker->onMessage = 'on_message';
    + p+ [) T3 j' Q" R; }4 x3 G0 H
  45.     // 执行监听。正常监听不会报错
      ~1 @2 I# g2 R) O
  46.     $inner_worker->listen();) f% l4 i. {3 g  e  Z
  47. };
    : ~8 f" W( X- |5 \! c1 }# h

  48. ' m, f2 B+ q3 A$ X% z1 C2 {- M
  49. $worker->onMessage = 'on_message';
    6 x( ~3 L" n$ d2 C: S( K
  50. " Y/ K# a5 n. `
  51. function on_message($connection, $data)  R1 ~5 f. w( F! V/ [: l6 Z
  52. {, w, B7 _1 t/ S0 ^; U6 o% Q
  53.     $connection->send("hello\n");- V$ p3 f# S0 j8 d1 c
  54. }! ~& w  A" q, `9 q

  55. % E  `; n  \) l6 e! l- l
  56. // 运行worker
    - A# E/ _) N6 t! s$ a# u" [
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    1 u& H5 g- v- S6 l; I: z
  2. use Workerman\Worker;3 g# ?8 V# u8 c( Q
  3. require_once './Workerman/Autoloader.php';; C  W) e1 s( H/ U% ]6 Y
  4. // 初始化一个worker容器,监听1234端口
    & j# h& _6 t" v5 Z$ q" G- x
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    3 }) T+ y) w& [8 G6 r
  6. ; o, ^2 P" V$ G* g
  7. /*2 p3 h& Y$ i8 A$ s- o& ]% f3 o4 e
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误% ~& @9 q+ m2 k
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true). K2 W7 f, `  r$ g" A+ Y
  10. */- W. i0 I: l( J* W0 V* E0 J; w; f
  11. $worker->count = 1;+ y7 E+ q% M' t: l; @1 [) L
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口5 p5 ^- V% R- Y) k4 A
  13. $worker->onWorkerStart = function($worker)
    8 G9 G& I/ D$ p, I' p: e
  14. {
    % j+ b) I7 x8 {; }, _( |9 o
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ' Y9 s5 R, Q: E; `" f1 m* G
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');+ s3 w* h) z! v# F
  17.     $inner_text_worker->onMessage = function($connection, $buffer)9 C2 t3 c9 f6 J" B. _8 g6 [- b/ x
  18.     {/ b1 D* L! O: t/ k
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据: a1 k! R. i1 R& ^
  20.         $data = json_decode($buffer, true);
    5 v8 L; P+ ^1 j/ b2 R! ^
  21.         $uid = $data['uid'];) w- M# E5 |5 r# m( r
  22.         // 通过workerman,向uid的页面推送数据
      M: m* s; V( R" c: s3 e
  23.         $ret = sendMessageByUid($uid, $buffer);3 S% d; P6 p5 p( V
  24.         // 返回推送结果
    ) V) }6 n0 C- B2 L* g
  25.         $connection->send($ret ? 'ok' : 'fail');) O3 M! r. T$ c6 y0 C2 x9 _
  26.     };* V6 j1 I0 r7 L. `* l
  27.     // ## 执行监听 ##
    + C* m8 H1 n1 n: M6 n* r0 ?
  28.     $inner_text_worker->listen();  N. d* Q; ]9 {
  29. };- w  x9 p7 Z0 @$ x2 m
  30. // 新增加一个属性,用来保存uid到connection的映射
    : }. u2 w  x0 m+ z1 O; M5 l
  31. $worker->uidConnections = array();7 i) g$ o- A7 f7 `
  32. // 当有客户端发来消息时执行的回调函数
    . L/ _2 k$ u0 q: R+ `& U* d
  33. $worker->onMessage = function($connection, $data)8 X/ J" e2 j8 v7 c
  34. {0 B( y9 i: y/ a" ?
  35.     global $worker;
    9 A" m: R3 p5 c4 F4 n1 ^3 ^
  36.     // 判断当前客户端是否已经验证,既是否设置了uid: i3 E, y/ F7 b
  37.     if(!isset($connection->uid))' c. n) Y7 }, A7 z
  38.     {
    " @9 x" T; m) E+ K9 S- s
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
      f: E1 N* O, B) d
  40.        $connection->uid = $data;
    5 @- P7 ?+ J/ x9 q+ Y
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    5 L! E1 t& u( f* t9 }
  42.         * 实现针对特定uid推送数据8 ]" r5 E$ L# k; w5 |. {0 M; {' z
  43.         */
    3 b: P7 @# g4 q5 r1 R( y
  44.        $worker->uidConnections[$connection->uid] = $connection;4 i4 t& P# Y# Y5 I* s5 ]0 B
  45.        return;& m2 }! X5 f: I7 ]
  46.     }- l: x( a- c4 ~+ X5 m
  47. };0 M7 H/ m. i/ k& {! h

  48. ' D1 z5 r* v; D* h' L" D; v( }
  49. // 当有客户端连接断开时" I/ E2 t, A& g
  50. $worker->onClose = function($connection), {# a3 m  I+ M
  51. {
    / t/ d7 _3 w" z) `, S: E
  52.     global $worker;
    2 b) O# r6 B" i; ~- W7 l
  53.     if(isset($connection->uid))
    + d( K8 m" Y+ t$ a$ l' w
  54.     {
    ) t) g7 Q! F4 y: I, d7 N6 g9 J
  55.         // 连接断开时删除映射  h. N9 |* n: j& F! G
  56.         unset($worker->uidConnections[$connection->uid]);! J3 b+ H% `4 ^  Y9 ~- \3 p
  57.     }
    & G. y% A) }! v: x" q, V0 @/ |) |
  58. };& s% d/ f0 {9 o" w  x& b

  59. 6 }8 y* j1 w  K- Q
  60. // 向所有验证的用户推送数据3 K, M5 s! p5 T- `/ |
  61. function broadcast($message)/ d2 g9 Z2 }: L& \( f
  62. {1 X# [  H1 T. a- P3 j
  63.    global $worker;
    2 |9 b7 F& b! _9 b' U
  64.    foreach($worker->uidConnections as $connection)
    6 C1 O3 z/ O2 }7 q9 D! p5 @
  65.    {8 H: L& H: Z6 v& X0 `) z/ v
  66.         $connection->send($message);
    " H" ^) h4 z  c% G6 L% R2 }
  67.    }
      \& v4 ]' U% |+ C: e$ I
  68. }
    0 r, |) H; I6 l3 W$ `0 V* E

  69. , t8 m! }4 Q' v1 {# g
  70. // 针对uid推送数据. n; e/ a% K# ?% Y# w) o; a
  71. function sendMessageByUid($uid, $message)
    3 _5 B1 l# c9 \* v  H
  72. {2 S0 y) [8 l7 p+ m
  73.     global $worker;
    2 L8 K, [0 ^8 r2 [  A3 ]5 o, I
  74.     if(isset($worker->uidConnections[$uid]))
    " W' T) c4 Z8 S4 T5 M8 E) X
  75.     {
    , I1 B. C! D$ Q$ W% w& K$ d' K
  76.         $connection = $worker->uidConnections[$uid];' M  L& ^5 c- J; Y) u& }9 B
  77.         $connection->send($message);
    2 J9 O5 ^+ Z9 R9 i* v  ?, _
  78.         return true;
    4 B$ r) r/ h# d# w
  79.     }
    " Q6 b# V$ M4 W: G) U% H. d
  80.     return false;. a2 o2 E/ v- o4 r! b( D) R
  81. }2 N. {/ F; J0 |  V+ B; W

  82. ) |/ l' U* R6 L8 V6 V% O0 _* _- f
  83. // 运行所有的worker
    * i( B9 r( j7 f" [, A2 |
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    6 L3 m0 b! z! d3 ~1 u* q! g% e
  2. ws.onopen = function(){6 y0 m* m/ r+ V
  3.     var uid = 'uid1';
    % q% T5 O! v" U% `% C* W. o
  4.     ws.send(uid);) w% [% L, g& f7 q
  5. };. y) @! @! |- E" i' u
  6. ws.onmessage = function(e){
    ! n5 Q, ^3 N; ?3 M( X$ A: j
  7.     alert(e.data);& G$ U% t% q& W
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    ) i) s2 \. Q5 ]' u% i' M7 o% v
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    1 Q5 n1 G* z+ U" x( Z4 ^
  3. // 推送的数据,包含uid字段,表示是给这个uid推送! d* q8 j+ o) F$ f" z! G
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    6 I+ Y0 |& k4 z- b3 b+ n
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 j9 ], C/ K3 u( x" ]* ]' ^! O
  6. fwrite($client, json_encode($data)."\n");  W1 s5 e5 w) b" Y6 f/ c; Q! b% k
  7. // 读取推送结果
    / N' C* K5 D3 f" {% ]5 k  C8 S
  8. echo fread($client, 8192);
复制代码

* H+ z$ [% `% J: B3 f- ?
+ T% I  F& t# i* |
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-23 02:15 , Processed in 0.126049 second(s), 21 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!