您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12155|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    8 G- f- e' L' ]) h9 {
  2. require_once __DIR__ . '/Workerman/Autoloader.php';+ h: f5 _* g3 d0 n  W9 W& c
  3. % i  X/ j  N. ~6 e! M
  4. $worker = new Worker();6 q" }9 S" }: L5 I9 F$ }
  5. // 4个进程) c& e  P1 E' P2 _2 x3 J
  6. $worker->count = 4;
    . F5 ?+ M$ p. N0 P$ M  x
  7. // 每个进程启动后在当前进程新增一个Worker监听9 _$ u- D2 U- R4 a
  8. $worker->onWorkerStart = function($worker)
    0 S/ f2 k& Y" }$ O( L4 i* r
  9. {* L  b7 g3 C$ X! g+ a0 |: A& ]
  10.     /**
    ) a8 w) x* k+ q
  11.      * 4个进程启动的时候都创建2016端口的Worker
    8 F0 Z, Q$ {- o  k/ d. s  T
  12.      * 当执行到worker->listen()时会报Address already in use错误3 Q3 M) z) _9 f, T* C
  13.      * 如果worker->count=1则不会报错
    ( c  D; |9 s0 w2 q5 d3 E
  14.      */
    / E* J& Z; H* z6 U
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');5 [' K' C: B. ]+ Z- f
  16.     $inner_worker->onMessage = 'on_message';
    ( q' M, ]$ G: A8 z' A! l* h
  17.     // 执行监听。这里会报Address already in use错误9 r/ j3 v8 R/ [/ x, G6 P
  18.     $inner_worker->listen();& d* I; V( w, c: ^; ]
  19. };
    3 ]9 j% s3 y5 o
  20. 7 [- [  z7 ]( E* k& R
  21. $worker->onMessage = 'on_message';
    ; x' B; d9 f0 v9 z- \  o
  22. 7 Y; n( f0 @# e, ]- V
  23. function on_message($connection, $data)1 ]4 {5 ^2 o3 k! |- u6 Z  r* o
  24. {
    3 G5 B9 S# _$ y7 G: g; f5 W
  25.     $connection->send("hello\n");
    # ^+ R- g2 `- C5 g0 P
  26. }/ @' d8 Z& \' P' c* I& t" b9 \

  27. " I8 H" r: N  O) }4 N- U
  28. // 运行worker
    4 b- ]) _3 e8 `8 U( q
  29. Worker::runAll();
      K( Q! B/ [& U8 M' W( V
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    + S: q6 A; r, T: s# l

  31. / m% w( I# u# L6 ~7 J  d' r
  32. use Workerman\Worker;
    ) u4 I7 q" W, y% p; _
  33. require_once './Workerman/Autoloader.php';
    ( V. x1 C; k1 E7 s

  34. 6 a/ e+ J( ^5 E3 Z, n7 A3 T' w
  35. $worker = new Worker('text://0.0.0.0:2015');
    8 f: o2 t$ G5 G
  36. // 4个进程
    1 x8 J+ o3 B/ h3 E9 _4 V- R+ W
  37. $worker->count = 4;! |, e; }$ e8 C) l, K+ B
  38. // 每个进程启动后在当前进程新增一个Worker监听
    & t+ x4 K% y: y/ u+ x
  39. $worker->onWorkerStart = function($worker)1 w" D: E+ W' K) h
  40. {8 X  M6 I- n* Y5 U: ~3 v1 N$ b
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');( E3 c6 u) U  [9 o* A
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ! E3 o7 Z$ t4 F
  43.     $inner_worker->reusePort = true;' A9 I: e3 T% \8 S# M0 R2 c1 c
  44.     $inner_worker->onMessage = 'on_message';. k# F% t5 ^5 y0 ~$ n
  45.     // 执行监听。正常监听不会报错
    * G5 l$ L5 C6 D& Z, g. V
  46.     $inner_worker->listen();
    4 q( @! R5 g' Z/ h9 o: |. s+ g7 V2 g
  47. };5 V; ?: {3 {$ M5 ~- o
  48. * {7 {+ U5 I* x% B6 }
  49. $worker->onMessage = 'on_message';* K1 u6 z; X5 y- V7 i

  50. $ N$ _$ L' S: u! m& ~- V
  51. function on_message($connection, $data)
    + }" ~  m4 K3 {! W) \
  52. {
    0 w/ H' {7 j5 X+ F; x
  53.     $connection->send("hello\n");
    & c# g2 s3 b$ M, |  k
  54. }* {7 B* u4 c$ l4 d* D6 {

  55. % R! k8 P2 T0 N- _
  56. // 运行worker
    # G1 _; Y: b* s/ c
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php& A6 \# q& X2 w0 J! e6 s/ D4 j
  2. use Workerman\Worker;
    ' N; n$ z' }) P3 v0 g
  3. require_once './Workerman/Autoloader.php';5 r6 W9 x$ ~3 W$ |
  4. // 初始化一个worker容器,监听1234端口7 q, ]; v1 V7 c! ^7 p4 S
  5. $worker = new Worker('websocket://0.0.0.0:1234');  O7 w$ X8 E+ o0 l: V8 O$ x
  6. 6 w/ I  T$ l: q8 r- q
  7. /*
    5 I3 ?( q/ ]! A; D. Y  m
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    ! |3 p  s7 S( }* r0 P& d% K* |6 R. Q
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    ' [/ d% S) b# y1 m% C9 h! a* U; q8 }4 z
  10. */$ l% P) x6 Z/ e7 J' e
  11. $worker->count = 1;! `$ k3 ?' Y* s
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    & N* ?# i1 q8 K7 G$ R- t
  13. $worker->onWorkerStart = function($worker)7 @! x$ q" c4 `* ?6 a
  14. {
    - Z6 ^+ U& z. G
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
      Y" t* z9 G! f
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');, R, G/ {" t. ^( _
  17.     $inner_text_worker->onMessage = function($connection, $buffer)# X, k  q+ Q/ }. a( n
  18.     {% J% B5 p$ q, U5 g2 K
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    7 ]: `% P* q* h/ `
  20.         $data = json_decode($buffer, true);: z8 q1 b* |! a6 B
  21.         $uid = $data['uid'];
    % e: Q9 F% F$ R* H* S
  22.         // 通过workerman,向uid的页面推送数据. D, J: J1 E+ Q6 @
  23.         $ret = sendMessageByUid($uid, $buffer);
    " h! u7 l& w0 T1 {4 L! ^: ^
  24.         // 返回推送结果* w* H+ j  e5 Q# n& k. `: H
  25.         $connection->send($ret ? 'ok' : 'fail');
    * Y9 a* d6 @% |9 `. H4 Z6 S6 ?
  26.     };
    - D: e6 i* I' Q5 Z$ U4 I3 G9 @
  27.     // ## 执行监听 ##( W. S, M9 N3 C% l+ V9 Y8 j( A
  28.     $inner_text_worker->listen();
    % K9 J$ h3 C5 O' l6 H
  29. };) l4 \2 I3 [0 [9 O1 a4 t
  30. // 新增加一个属性,用来保存uid到connection的映射
    ; n5 k3 v) V& K* y! c
  31. $worker->uidConnections = array();7 d: t# Z+ N9 a# B
  32. // 当有客户端发来消息时执行的回调函数0 f2 G# e( P6 S3 W/ p. A
  33. $worker->onMessage = function($connection, $data)  q% S1 u' D9 S& Q  a
  34. {
    ! |8 Q9 T1 Q# R
  35.     global $worker;! q2 V, ?+ G7 x$ }5 u( P# J
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    ; H$ F: c; A: g( @3 U2 z" ]0 ?. B& C
  37.     if(!isset($connection->uid))
    $ O4 I, f! g1 F( u
  38.     {
    " x" i" [; {' |) o* W/ A% v( }8 Y( o
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    / C8 U& W- n! c4 U
  40.        $connection->uid = $data;
      @! ?, i  m2 i% z* B
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    9 b" s0 `9 j" F0 s7 D4 O
  42.         * 实现针对特定uid推送数据. L, A$ r% S: J' L
  43.         */
    4 E3 @- Z7 z7 g; u- B
  44.        $worker->uidConnections[$connection->uid] = $connection;
    1 a2 U7 t) y3 L0 [
  45.        return;3 |/ E, p2 I; X' }: w6 y5 H
  46.     }8 d, }- R+ U1 u! H
  47. };
    5 T  l/ {% ?% c- g) }
  48. . C: a, U: [5 d. i. v* g
  49. // 当有客户端连接断开时
    ! m6 q% V( _. ~$ O4 Y) Z( q2 J
  50. $worker->onClose = function($connection)) e( V8 p4 {) |; u8 E
  51. {1 ^: y" L4 S4 t& t# |4 P* H( ]
  52.     global $worker;+ n! M# P* H3 b$ f% z) j9 L  I
  53.     if(isset($connection->uid))
    $ g- c2 o9 D. `+ e
  54.     {9 w0 j* s7 u$ m( h
  55.         // 连接断开时删除映射7 q9 t0 ?# Q3 B; e% C
  56.         unset($worker->uidConnections[$connection->uid]);
    # C1 T6 @  p, m) |5 N( N
  57.     }
    7 c! x8 D9 V" x  B
  58. };5 N5 Z& G+ ]( u/ `, J% A  L

  59. 4 _" m8 ?5 b3 d
  60. // 向所有验证的用户推送数据
    , p# u- ^, ~6 G9 n3 b
  61. function broadcast($message)
    ( U% N! w# S/ \! d0 i8 b  w& d
  62. {
    ' l2 }$ G! H! t+ L
  63.    global $worker;
    ( V1 r. i8 h& _( ~6 m
  64.    foreach($worker->uidConnections as $connection)+ C! f) O2 g2 d7 ]1 w
  65.    {
    5 F5 {2 ^; b$ B0 x) s1 i2 e
  66.         $connection->send($message);
    4 k  m) |! a# T
  67.    }$ A5 A  P# Z2 F. e/ ]
  68. }
    6 ?- g( t$ J# e
  69. : g/ G* q8 c. ^
  70. // 针对uid推送数据) r& ?& `" {: n6 o1 X* u; _- [
  71. function sendMessageByUid($uid, $message); n7 `5 |0 t/ X2 q
  72. {/ l& S, t* P0 D) |3 Z0 N
  73.     global $worker;) I* e, K, H4 E: m& n* `
  74.     if(isset($worker->uidConnections[$uid]))3 ?. f# c) Y: A" j: C; O0 o: A2 H$ `
  75.     {: b! g& J4 L6 x* P4 B) u' }
  76.         $connection = $worker->uidConnections[$uid];. f! w( O' h; n9 {& n( \
  77.         $connection->send($message);
    / ^; a& j, T' p$ h
  78.         return true;% D3 L9 ^' l1 r- Z8 P4 ^7 [
  79.     }
    , D( N+ t/ V6 V! h
  80.     return false;
    ( U- y+ I4 Z0 X( ~
  81. }
    3 j! |% T, S9 s3 r9 M' x( n

  82. 7 p# B# o9 J0 O2 _8 Z% N2 R& z- r, l
  83. // 运行所有的worker
    2 q: V6 d" e' i& A9 H9 V3 P; n
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ( k7 A1 @9 _( R, ~, t2 ~7 E
  2. ws.onopen = function(){
    $ R; h6 c: d. M1 [  z" N, l
  3.     var uid = 'uid1';. y9 R+ Z0 ~6 ^7 h' h* O2 J
  4.     ws.send(uid);- ^# [7 A3 _0 ?4 c! T5 H6 T, K
  5. };) X0 ~) J8 W3 B7 g
  6. ws.onmessage = function(e){
    3 y4 `/ T- E7 x8 L: s
  7.     alert(e.data);
    * U$ L8 g% J7 C  X4 F
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    + z  f# m. j* Z
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);# u& f1 d, x0 |- ]6 L; z! e+ P( z
  3. // 推送的数据,包含uid字段,表示是给这个uid推送9 e- d8 J! ]& C1 K" J
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    ; {2 j4 h7 f# @
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    / h4 B% k4 E2 k4 C2 l3 I( N5 j6 H
  6. fwrite($client, json_encode($data)."\n");
    # B3 ]  V) ^5 X' u4 o
  7. // 读取推送结果' ~& M# i! Q4 ~' v0 M7 o% n9 l
  8. echo fread($client, 8192);
复制代码
# m9 |$ j2 U5 N
, ^* z( C- s& `9 i1 J0 V
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 15:24 , Processed in 0.111382 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!