您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12169|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;+ q# ]$ h: v/ o
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    & x1 e3 M# e2 k0 e5 ^" v& a& _" x
  3. ; b5 v. B9 M7 ]7 E2 ^( o9 q
  4. $worker = new Worker();
    3 T# E; g* p. x4 ~% x: k
  5. // 4个进程4 y* b( U5 a! h- D* |* a1 b
  6. $worker->count = 4;4 Y/ K- d+ h1 ]  w: R+ {+ A: R
  7. // 每个进程启动后在当前进程新增一个Worker监听
    5 h' Y! b  X% M% f
  8. $worker->onWorkerStart = function($worker); r4 n7 \) a. f. [; G1 _
  9. {
    " l2 G5 E& H8 b# w# s, u
  10.     /**
    9 P$ _3 \" p% q# Y! A+ i- }
  11.      * 4个进程启动的时候都创建2016端口的Worker0 ^3 z3 V2 o0 d( c
  12.      * 当执行到worker->listen()时会报Address already in use错误
    8 H7 x: K8 R9 x- g. D" ?  o
  13.      * 如果worker->count=1则不会报错
    2 M6 Q$ P$ Q( C( ~' O  S2 `
  14.      */, o* l. z5 m$ ]7 [3 k. x' Y; a
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');- @. e6 p: v% S0 i8 e
  16.     $inner_worker->onMessage = 'on_message';
    ; ~/ b; |: J$ k9 u- |- a" {
  17.     // 执行监听。这里会报Address already in use错误
    ; G/ [; ^8 ~5 f
  18.     $inner_worker->listen();
    : D; b3 J& [% P: |/ I
  19. };. Z4 c& D2 A2 Q6 E0 O1 o
  20. 2 c& E: g: O2 J4 n: V! G
  21. $worker->onMessage = 'on_message';
      m9 ^6 v0 |) T6 {+ N% X3 X! _
  22. 7 O9 E! {5 g3 A( {& `. N
  23. function on_message($connection, $data)7 A. i" h6 e3 t4 X
  24. {
    8 [  s- A0 @" ^  j- P
  25.     $connection->send("hello\n");: \9 |$ g+ G7 M" ?  e
  26. }
    , C6 }( I' [$ Y. M# t$ O, _

  27. / j" W$ o9 G& I' F
  28. // 运行worker
    5 ]/ C% ]) ~: S4 T) H+ N
  29. Worker::runAll();: q# L- o, ?" n# E: {
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:9 K! E: F4 j8 ~8 Z8 c2 B3 n1 G
  31. ) I, _5 O, l: F' c. u* |
  32. use Workerman\Worker;9 u- Q7 k6 w7 [3 O1 o* Y+ }
  33. require_once './Workerman/Autoloader.php';
    ! I4 P* s% f4 T+ t" C+ g" E9 H
  34. 4 i: l& w. [( S
  35. $worker = new Worker('text://0.0.0.0:2015');
    ) N4 w8 f3 o# Y9 o4 e; [
  36. // 4个进程: L- @5 l- K; |  B% r
  37. $worker->count = 4;
    ' S7 Q  L3 n' X
  38. // 每个进程启动后在当前进程新增一个Worker监听; g7 P+ U% C# E) w$ N
  39. $worker->onWorkerStart = function($worker)& \' y6 r: S; k  f2 V$ u
  40. {, d8 u# q0 z0 {$ p; \) y* o8 _
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');/ M( J0 t# v. ~2 W. N9 P$ [
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)- z2 ^- e2 P/ {( x, @$ h
  43.     $inner_worker->reusePort = true;' x" ^& Q# E# B% E, B
  44.     $inner_worker->onMessage = 'on_message';& y$ m! P, Y6 D! ?  J, h( Q
  45.     // 执行监听。正常监听不会报错
    9 X/ p, S  [, B. ?8 o. \. o
  46.     $inner_worker->listen();
    " K8 D/ E1 ^! p3 u
  47. };
    6 N3 [7 K( W/ X

  48. # n( L2 \( i7 h
  49. $worker->onMessage = 'on_message';' Q0 o" b! y  f

  50. * F2 n1 k- {  D6 F+ J
  51. function on_message($connection, $data)  ?0 Z: D2 ?# b& o
  52. {* H- j  G0 N1 E; C' s* C
  53.     $connection->send("hello\n");1 |& O( S  [1 w- U8 S4 |
  54. }
    + [3 N2 E$ g1 N1 q( z

  55. . X7 `: d7 f+ N9 @
  56. // 运行worker+ O; |- B3 t- n1 F$ h" X" w
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    # \6 \" z( O  i; n( p$ g
  2. use Workerman\Worker;# Z/ I9 s6 d0 {% p4 o2 L
  3. require_once './Workerman/Autoloader.php';
    ( L( u# o6 Q/ x3 h) w+ ?8 n! m
  4. // 初始化一个worker容器,监听1234端口
    8 B$ ]% b) o5 J& p
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ' I' I5 x2 m( m5 d3 I9 V
  6. $ [% f' g. U' r# x& L
  7. /*
    ( t8 n9 `4 v6 E" h
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    6 c1 `% e# K0 D* ]& o# d- `+ h# A
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true); d; o$ Q) _- i* k
  10. */
    2 f: p) Q4 u& `8 Z' J/ f: ?
  11. $worker->count = 1;
    9 i3 j+ S6 B2 O+ S
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口$ [3 C5 @9 M8 p: u( m) n( ]
  13. $worker->onWorkerStart = function($worker)
    ! N1 k  r. I. s+ l& p6 h
  14. {' }% T# H% B4 r
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符6 ]3 v: E; P+ M! K6 ~- t
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');* b% w" N* \: ^8 v
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    0 Z- B  T2 f5 X: J: w
  18.     {
    5 I; I+ w+ t. S" e" e
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据2 ~* d, b, l9 H
  20.         $data = json_decode($buffer, true);5 ^7 d6 u- n5 k; U$ O
  21.         $uid = $data['uid'];6 ?- \1 b8 S1 n+ y4 P& X$ z
  22.         // 通过workerman,向uid的页面推送数据7 k$ H9 ^9 u: z) e  m
  23.         $ret = sendMessageByUid($uid, $buffer);
    4 q. ~3 D, z) }; t5 [0 `' t
  24.         // 返回推送结果& W1 p& U6 u) b
  25.         $connection->send($ret ? 'ok' : 'fail');
    . E# P0 F4 A- n
  26.     };5 l- P1 T; U/ S: B
  27.     // ## 执行监听 ##
    + q+ r. k9 @+ I% G- r- a
  28.     $inner_text_worker->listen();+ r* c' a8 O0 {$ H4 `/ ^, R5 n( N
  29. };
    # y2 z& M! A1 R3 X
  30. // 新增加一个属性,用来保存uid到connection的映射
    1 u. Z3 n1 g' O! G6 {% b
  31. $worker->uidConnections = array();
    & z$ O7 @9 r( E
  32. // 当有客户端发来消息时执行的回调函数
    4 M' B3 }) z4 f" B, u3 X
  33. $worker->onMessage = function($connection, $data)
    1 r8 M  b- Y5 n' C% I: ]
  34. {
    ! p2 y& @" c+ S, m2 a+ V" z
  35.     global $worker;
    7 J9 E0 a1 p1 t$ x
  36.     // 判断当前客户端是否已经验证,既是否设置了uid+ ~0 W  |, J# G, I
  37.     if(!isset($connection->uid))5 D" K& U8 Z# f5 i! C* r) H- P7 H7 {% c
  38.     {
    4 I: z; y+ b9 j3 I5 X
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)4 i  }2 |% J% F! A& O+ s
  40.        $connection->uid = $data;  L2 ]; C/ G) Z4 _& f
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,2 R1 C5 X* U2 f9 j4 W! K
  42.         * 实现针对特定uid推送数据# L% p8 _* @5 K. B
  43.         */8 N& J; \* m# g4 @# r
  44.        $worker->uidConnections[$connection->uid] = $connection;. u* o8 P  _4 C3 W) e
  45.        return;
    8 s5 T7 j" g6 W" H
  46.     }
    2 \- o9 o  W( L- e7 f$ I& c
  47. };
    & P. C0 @( D  w& g/ j0 h

  48. . \9 ~  ~9 Z7 ]% ~- `: U; u
  49. // 当有客户端连接断开时, h3 Q4 [4 t2 e+ E
  50. $worker->onClose = function($connection)% y  N2 t0 n. |: u) z% {# \% h. h
  51. {# S; {1 m7 h! J# N: [
  52.     global $worker;
    2 Y) U; a- ^: a6 m/ t1 I( d
  53.     if(isset($connection->uid))) T$ z% e$ Y: K
  54.     {
    ; j. X+ T0 _0 O0 |+ A- W4 Z& B
  55.         // 连接断开时删除映射
    * S- |& N8 m* o" w) j4 B+ Q7 ^
  56.         unset($worker->uidConnections[$connection->uid]);
    , v! Y# {/ F$ A& o& Q2 a1 \0 p
  57.     }
    : U7 Z2 t9 I# k! u; ~# C
  58. };6 r% o/ s* t- v

  59. ( {3 E# U" }# v+ i# @2 b  t* I
  60. // 向所有验证的用户推送数据
    ! b) O" L7 h. w4 M! f: c
  61. function broadcast($message)
    1 l* S1 r5 R& W2 s" U0 b
  62. {( l* `7 P% p) S
  63.    global $worker;
    7 I  @5 Q; z. G/ G, K& J
  64.    foreach($worker->uidConnections as $connection)
    : l: T$ ^8 H: Q4 d
  65.    {
    $ a) H! H6 z. Z6 x( G* m- X' g+ ~, M
  66.         $connection->send($message);
    ( `# g  d) h9 A( s1 o
  67.    }
    1 D8 {. j+ _' n1 [
  68. }- B3 g) d' l/ U
  69. ! y1 ^* }+ F6 z4 K
  70. // 针对uid推送数据
    % t+ p+ J, i# V% H% v/ b
  71. function sendMessageByUid($uid, $message)5 H- w/ V: W* |/ U) g" h
  72. {
    * s4 g7 z- }( E" P. H4 ~
  73.     global $worker;3 Z9 k+ i% c, i' L% V3 q2 U
  74.     if(isset($worker->uidConnections[$uid]))/ U3 W, W$ S, m1 S% Y0 l
  75.     {+ Q9 y1 J' k9 a/ i4 {) Y
  76.         $connection = $worker->uidConnections[$uid];
    7 C5 I" J- w/ r* q) t( ]/ c; d
  77.         $connection->send($message);
    1 R0 k/ Y: F4 M7 g3 }8 }8 ]
  78.         return true;
    ; v# F( h4 Q4 q3 }/ l; l7 R
  79.     }+ q  |: h: m; ^% C4 w
  80.     return false;* H& d( ]! M: E  x6 K/ n, s
  81. }! c% [2 [% E/ ~$ h! C4 N$ W8 b0 f

  82. / w6 B- w8 O, q: P( F4 s$ `# {
  83. // 运行所有的worker$ n; j4 W# u, O) [9 D
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    6 j2 i7 q3 J9 f2 y' z6 b" k& b
  2. ws.onopen = function(){
    9 W* f$ ?3 h* E- s5 E* T: i
  3.     var uid = 'uid1';6 K0 F8 e2 R+ Y; J
  4.     ws.send(uid);
    & X9 \9 d) Q- z' H
  5. };
    - Z2 C+ C) X/ s) p
  6. ws.onmessage = function(e){
    4 Q( S( S, g/ B7 F5 e( M
  7.     alert(e.data);
    2 }( C! T/ p  I3 ?
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    . P3 S. k0 {9 t0 Y# ^+ S7 @7 a
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    8 L  Z, C, M, U' T) m. B4 h, A" r
  3. // 推送的数据,包含uid字段,表示是给这个uid推送: a: E4 L$ M, k0 i4 G7 {
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    8 w3 G3 r4 i# E
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ( F1 _1 h4 ~6 U- i3 J1 g4 l8 N
  6. fwrite($client, json_encode($data)."\n");
    ' z5 _8 b/ D& P' }' f3 ?& I
  7. // 读取推送结果
    ' w( {& J2 W( T8 b& Q
  8. echo fread($client, 8192);
复制代码
( f! \& Z- G% E

1 v5 n3 S+ a; W! F4 }
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 20:47 , Processed in 0.121527 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!