您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12166|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    6 i- V$ N6 W% z6 J0 E
  2. require_once __DIR__ . '/Workerman/Autoloader.php';* o% s+ \* n/ r+ k' T  }# X; [
  3. $ l* q) \# V. C; I) O8 v3 W7 u+ q
  4. $worker = new Worker();+ l4 q) ?8 ~# l' p& N8 |' C2 E
  5. // 4个进程" i; C- D% ]+ Z. V+ K1 \
  6. $worker->count = 4;
    + m  \" ^( b& g% V  `) D
  7. // 每个进程启动后在当前进程新增一个Worker监听: v: Q6 |* K! K* ~3 U/ L
  8. $worker->onWorkerStart = function($worker)
    3 }! E( A8 f0 y" F
  9. {" B* U1 M: k3 @% ^) ]
  10.     /**- r4 S' v* h$ D- F
  11.      * 4个进程启动的时候都创建2016端口的Worker
    1 W9 n0 J) s! C8 u  i
  12.      * 当执行到worker->listen()时会报Address already in use错误
    + ~# N5 D6 X' W' e
  13.      * 如果worker->count=1则不会报错. f+ c9 O  i# F- y" x
  14.      */! k7 j7 w1 z. j/ e" W
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    " a' K0 x& x3 R
  16.     $inner_worker->onMessage = 'on_message';- P- p5 S  R/ y% s: o
  17.     // 执行监听。这里会报Address already in use错误1 y, [7 m- c. D
  18.     $inner_worker->listen();# o% b% F5 Q7 l: x( @
  19. };7 t! D9 M$ K% V( b; `; n0 A) y

  20. % ^; m( @$ b# b0 F; y
  21. $worker->onMessage = 'on_message';
    6 r. B" P' ]  f
  22.   ^3 T1 _% H: f8 i4 b
  23. function on_message($connection, $data)% z1 h% O/ X; k$ J8 O- p- C6 b
  24. {5 }9 X! z9 A6 g; F# N5 R. ^& ?- g
  25.     $connection->send("hello\n");
    ( N; i% z. A5 [" u
  26. }5 ~! D& n* B7 [$ D) ]. N
  27. 3 v6 s4 H. t; U+ z+ D5 j) d
  28. // 运行worker
    / H4 Z/ E; I1 M2 V
  29. Worker::runAll();
    6 K) p4 G$ I* Y7 z7 m5 I
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:3 I1 G; g+ K4 _: @) y3 r3 k3 s" {% [
  31. 1 z8 W. ^) [  d8 e% }9 x9 G
  32. use Workerman\Worker;& n( r! y3 _: r* a& k
  33. require_once './Workerman/Autoloader.php';
    7 ]" n; n! N( w7 `/ x5 L5 ]6 X% v

  34. ! T- Z% J- b+ s/ \% I1 O7 A
  35. $worker = new Worker('text://0.0.0.0:2015');
    4 ^7 F4 K% ?. y% V& D
  36. // 4个进程$ b$ P5 y8 t: Q+ D  k4 ^; L. }
  37. $worker->count = 4;/ C) ^- S2 t+ x. X5 R. i3 H; G- M$ p
  38. // 每个进程启动后在当前进程新增一个Worker监听
    " d' \5 b/ R' e% |/ a& J
  39. $worker->onWorkerStart = function($worker)" |' s  X' B9 p. ^, B
  40. {
    3 R- r! e7 g1 r3 S/ C4 N
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    0 T3 ]. ]+ L- i- s, y2 t6 L
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)" {. O" L6 I+ ^6 Z6 x9 Y5 \- ^: v% M
  43.     $inner_worker->reusePort = true;- t" i$ I) t- P% `0 t  ^
  44.     $inner_worker->onMessage = 'on_message';
    % |1 ^! g, p+ }$ I
  45.     // 执行监听。正常监听不会报错
    - N8 ?0 ]* |( T5 ?. c/ c
  46.     $inner_worker->listen();
    - l  N) p  p2 _, \* X6 ?; E& h  n5 w
  47. };
    1 |8 F# W, A9 V1 \

  48. 2 \/ o( @6 Y4 R6 X9 H# q# g
  49. $worker->onMessage = 'on_message';: t! X1 F: J* ~% g, y" a, Q) L

  50. . `5 _. j: B% v) `% i
  51. function on_message($connection, $data)
    ) r5 K$ F% K! D# l! n- A2 ?
  52. {" \0 v; }, g+ u9 d* G
  53.     $connection->send("hello\n");* e7 P' N1 u; J: z9 t- T0 W6 S
  54. }
    7 \9 E& Y/ p+ o1 X( h' o  ?$ j, H! J
  55. ! X$ E7 F7 y# @/ w6 ^5 Z* ?
  56. // 运行worker
    ' |; Z  v5 x2 \# F( [
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
      N# ]4 M7 p9 q) l/ ?
  2. use Workerman\Worker;$ |, j; U; ]: ?
  3. require_once './Workerman/Autoloader.php';
    ' \( u4 I% Q$ ~- J
  4. // 初始化一个worker容器,监听1234端口
    * \* B  c, T8 M7 N7 y, Q& Q
  5. $worker = new Worker('websocket://0.0.0.0:1234');; N1 Q) @- \( s$ ?/ [1 b
  6. 1 z* t( w( v0 N5 c- C
  7. /*3 }$ _6 d+ M) }% K# W. p
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误5 C/ y: \! k' d
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)" w5 L& C) V) b9 w& A
  10. */
    , d% b- h5 t2 E* R% E
  11. $worker->count = 1;( x# T$ a* _" D/ U5 @" _
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口% d1 p( K: U- z! C: M! l
  13. $worker->onWorkerStart = function($worker)
    $ \4 `' ?/ {4 V$ r- a2 X
  14. {
    ) N8 ^! v: t5 O8 c7 m& U
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符) H' X$ J/ `. q$ U
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    " Y  Z1 `* [4 `* M2 g' R8 l3 j
  17.     $inner_text_worker->onMessage = function($connection, $buffer)& ^% h" M2 d* l2 V& Y
  18.     {& H7 K8 K% {$ I5 n' J
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    - V9 h) }1 B/ I3 K2 A
  20.         $data = json_decode($buffer, true);
    5 U' K1 H- G% |% y
  21.         $uid = $data['uid'];
    3 w1 H: B8 s$ w, i  l+ ~
  22.         // 通过workerman,向uid的页面推送数据
    ! ^+ @( p! e& D0 `/ p
  23.         $ret = sendMessageByUid($uid, $buffer);+ B1 ?0 t& a5 `+ `
  24.         // 返回推送结果) `) q# q, C1 Z; q# |9 J
  25.         $connection->send($ret ? 'ok' : 'fail');
    % \0 H, Q$ A1 G/ b: c5 @& I4 [
  26.     };
    : _% \( M; E( m! c
  27.     // ## 执行监听 ##* J  k7 s* K$ t% C+ [5 x
  28.     $inner_text_worker->listen();) ]$ T' V) g1 j* {+ K
  29. };5 t# N) v4 b1 N9 S" I
  30. // 新增加一个属性,用来保存uid到connection的映射
    6 R' U- P7 M. a/ A! j+ \5 T
  31. $worker->uidConnections = array();
    , H) V2 N2 g, b+ P7 l4 \. _# r: g
  32. // 当有客户端发来消息时执行的回调函数+ T% o, h. i8 F8 K5 ~
  33. $worker->onMessage = function($connection, $data); m3 E0 k7 |, s2 W' I4 T- K# S
  34. {
    0 [* v+ ^1 o; v% f
  35.     global $worker;" B$ }3 _  D; f! v4 x3 P
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    * |( I% Y. @+ i
  37.     if(!isset($connection->uid))0 [# e' }* ?4 G& O- |, G  n" q
  38.     {
    % K; M8 `1 t7 r. L. g. V
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)4 G  {. W$ U. O- w7 J7 S& T; ?
  40.        $connection->uid = $data;
    : n& ]3 \7 p! D( T5 ^
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    8 B  [! R7 }1 k/ C4 T: i
  42.         * 实现针对特定uid推送数据1 n6 U  V; m' a$ Y% r' E2 B& n! O
  43.         */8 r1 z6 E4 R: w! p
  44.        $worker->uidConnections[$connection->uid] = $connection;
    ; P! l' y( K/ M* ^1 t/ a5 O' B# \- T
  45.        return;, x: u+ x: E( {$ O* t7 X. a7 H
  46.     }
    ' k. Y6 G. o, `6 U" k+ @
  47. };* e3 x: X7 g: T4 @, b8 `  s
  48. * C* C) @5 l1 x  I. }8 b1 G
  49. // 当有客户端连接断开时0 I3 F' z( v  i9 {/ E. [
  50. $worker->onClose = function($connection). s9 R/ }7 r9 P6 p/ a
  51. {
    1 N& D" A. r( z" J, s5 Y6 x
  52.     global $worker;
    3 o& ^, y/ T. Z1 k$ e5 M' e
  53.     if(isset($connection->uid))1 M, G9 L% Y" Y8 X# V
  54.     {5 a8 x& h2 h, H  h6 c; @( G
  55.         // 连接断开时删除映射  S, t% \; L% w; n( J
  56.         unset($worker->uidConnections[$connection->uid]);
    ' W) y, D6 t1 {3 x( B7 f- w: Z
  57.     }3 D2 g2 E2 T; w% A9 Y
  58. };
    4 H* Z7 m; Z7 S9 `, [( V3 G
  59. 6 \" u- i6 M  v. M" _0 _6 E4 `
  60. // 向所有验证的用户推送数据
    # u6 j9 \) R, h) L1 {2 f9 t5 i
  61. function broadcast($message): \2 `& \' f$ p" W
  62. {, I- @! o6 y: D1 ]. V% b6 E6 M4 ]
  63.    global $worker;
    ) l( L0 M6 C% o8 T$ U
  64.    foreach($worker->uidConnections as $connection)* K* L: n1 y% c4 S: y8 i
  65.    {
    : z. ?; x: s* X/ ?: v% d/ l
  66.         $connection->send($message);
    ; K7 t$ \8 m2 b7 i7 U, X/ {
  67.    }3 N9 h2 Q! Q& y8 ^1 U# `; V
  68. }$ Z- j' }# ~& \3 U
  69. ; {# S7 y7 e! ]9 |4 T, E' A5 k" J4 m
  70. // 针对uid推送数据/ {; G6 Q/ ?4 a3 F" G/ u
  71. function sendMessageByUid($uid, $message)
    ' E+ C9 D: a' ?
  72. {
    2 F3 k4 g# s( u6 E
  73.     global $worker;
    & b( D1 o+ N2 R8 n
  74.     if(isset($worker->uidConnections[$uid]))
    7 }! ]- o6 c5 A1 g$ B& G: b6 g
  75.     {
    ! n3 S7 S$ q% p+ _) p
  76.         $connection = $worker->uidConnections[$uid];: H1 [+ k4 i9 G. A5 b) R" o& P) m
  77.         $connection->send($message);
    & ]4 |- u7 q5 f1 [$ f: v% A: B% U
  78.         return true;2 d: J$ j! X/ J& z* T; n
  79.     }
    1 l- c- n! E+ Z% u( |
  80.     return false;
    4 |: k4 K6 I3 |$ A& m0 D' k7 d
  81. }
    8 y5 [, x) t6 y/ L6 j1 r- ~
  82. 1 Y; @+ I; h/ R% o5 b6 o' [/ w
  83. // 运行所有的worker: K* @8 d8 A! f8 M9 U3 R- I
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');' s! b4 @9 u/ i2 o# `
  2. ws.onopen = function(){5 H* ]+ o- _9 t3 Q) Z
  3.     var uid = 'uid1';4 f! D0 _, n5 @$ s# p
  4.     ws.send(uid);
    % v2 _. C: C  q: o$ j
  5. };
    : H7 r( v% M9 |0 U# p% x  C- @# q
  6. ws.onmessage = function(e){7 K6 z3 }2 j9 T9 |" j  r9 i7 b- I
  7.     alert(e.data);
    ' j: M7 a9 P8 W( X" r5 W  y' X
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口7 O5 V" x4 e0 R1 `0 l
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);* K; h, \5 b. E9 N" h2 V
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    + ]8 W$ c4 s8 g) _% q9 a
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    3 d8 s5 T- S6 L* Q
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ f' p6 G" W3 X) I0 B
  6. fwrite($client, json_encode($data)."\n");3 d! R/ @7 B. s/ {% Z4 S0 ?
  7. // 读取推送结果8 ^( F! M3 B8 H3 \
  8. echo fread($client, 8192);
复制代码
% i3 Q4 j8 x9 ^9 n% R# ~
3 _  |( k# d9 V6 X' z1 u/ t
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 20:29 , Processed in 0.106851 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!