异步机制(Asynchronous) -- (二)异步消息机制兼谈Hadoop RPC

By | 01月20日
Advertisement

异步机制(Asynchronous) -- (二)异步消息机制兼谈Hadoop RPC
2011年01月22日
  上篇说了半天,却回避了一个重要的问题:为什么要用异步呢,它有什么样的好处?坦率的说,我对这点的认识不是太深刻(套句俗语,只可意会,不可言传)。还是举个例子吧:
  比如Client向Server发送一个request,Server收到后需要100ms的处理时间,为了方便起见,我们忽略掉网络的延迟,并且,我们认为Server端的处理能力是无穷大的。在这个use case下,如果采用同步机制,即Client发送request -> 等待结果 -> 继续发送,那么,一个线程一秒钟之内只能够发送10个request,如果希望达到10000 request/s的发送压力,那么Client端就需要创建1000个线程,而这么多线程的context switch就成为client的负担了。而采用异步机制,就不存在这个问题了。Client将request发送出去后,立即发送下一个request,理论上,它能够达到网卡发送数据的极限。当然,同时需要有机制不断的接收来自Server端的response。
  以上的例子其实就是这篇的主题,异步的消息机制,基本的流程是这样的:
  
  如果仔细琢磨的话,会发现这个流程中有两个很重要的问题需要解决:
  1. 当client接收到response后,怎样确认它到底是之前哪个request的response呢?
  2. 如果发送一个request后,这个request对应的response由于种种原因(比如server端出问题了)一直没有返回。client怎么能够发现类似这样长时间没有收到response的request呢?
  对于第一个问题,一般会尝试给每个request分配一个独一无二的ID,返回的Response会同时携带这个ID,这样就能够将request和response对应上了。
  对于第二个问题,需要有一个timeout机制,对于每一个request都有一个定时器,如果到指定时间仍然没有返回结果,那么会触发timeout操作。多说一句,timeout机制其实对于涉及网络的同步机制也是非常有必要的,因为有可能client与server之间的链接坏了,在极端情况下,client会被一直阻塞住。
  纸上谈兵了这么久,还是看一个实际的例子。我在这里用Hadoop的RPC代码举例。这里需要事先说明的是,Hadoop的RPC对外的接口其实是同步的,但是,RPC的内部实现其实是异步消息机制。多说无益,直接看代码吧(讨论的所有代码都在org.apache.hadoop.ipc.Client.java 里): public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { //具体的代码一会再看... } 这就是Client.java对外提供的接口。一共有两个参数,param是希望发送的request,remoteId是指远程server对应的Id。函数的返回就是response(也是继承自writable)。所以说,这是一个同步调用,一旦call函数返回,那么response也就拿到了。
  call函数的具体实现一会再看,先介绍Client中两个重要的内部类: private class Call { int id; // call id Writable param; // parameter Writable value; // value, null if error IOException error; // exception, null if value boolean done; // true when call is done protected Call(Writable param) { this.param = param; synchronized (Client.this) { this.id = counter++; } } protected synchronized void callComplete() { this.done = true; notify(); // notify caller } //......... public synchronized void setValue(Writable value) { this.value = value; callComplete(); } public synchronized Writable getValue() { return value; } } call这个类对应的就是一次异步请求。它的几个成员变量:
  id: 这个就是之前提过的,对于每一个request都需要分配一个唯一标示符,这样接收到response后才能知道到底对应哪个request;
  param: 需要发送到server的request;
  value: 从server发送过来的response;
  error: 可能发生的异常(比如网络读写错误,server挂了,等等);
  done: 表示这个call是否成功完成了,即是否接收到了response; private class Connection extends Thread { private InetSocketAddress server; // server ip:port // ......... private Socket socket = null; // connected socket private DataInputStream in; private DataOutputStream out; //............ // currently active calls private Hashtable calls = new Hashtable(); // ....... private synchronized boolean addCall(Call call) { if (shouldCloseConnection.get()) return false; calls.put(call.id, call); notify(); return true; } private void receiveResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { int id = in.readInt(); // try to read an id if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = calls.get(id); int state = in.readInt(); // read call status if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setValue(value); calls.remove(id); } else if (state == Status.ERROR.state) { call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); calls.remove(id); } else if (state == Status.FATAL.state) { // Close the connection markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); } } catch (IOException e) { markClosed(e); } } public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { while (waitForWork()) {//wait here for work - read or close connection receiveResponse(); } } catch (Throwable t) { LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); } public void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } DataOutputBuffer d=null; try { synchronized (this.out) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //for serializing the //data to be written d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //first put the data length out.write(data, 0, dataLength);//write the data out.flush(); } } catch(IOException e) { markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } } } Connection这个类要比之前的Call复杂得多,所以我省略了很多这里不会被讨论的代码。
  Connection对应于一个连接,即一个socket。但同时,它又继承自Thread,所有它本身又对应于一个线程。可以看出,在Hadoop的RPC中,一个连接对应于一个线程。先看他的成员变量:
  server: 这是远程server的地址;
  socket: 对应的socket;
  in / out: socket的输入流和输出流;
  calls: 重要的成员变量。它是一个hash表, 维护了这个connection正在进行的所有call和它们对应的id之间的关系。当读取到一个response后,就通过id在这张表中找到对应的call;
  再看看它的run()函数。这是Connection这个线程的启动函数,我贴的代码中这个函数没做任何的删减,你可以发现,刨除一些冗余代码,这个函数其实就只做了一件事:receiveResponse,即等待接收response。
  OK。回到call()这个函数,看看它到底做了什么: public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception throw wrapException(remoteId.getAddress(), call.error); } } else { return call.value; } } } 首先,它创建了一个新的call(这个call是Call类的实体,注意和call()函数的区分),然后根据remoteId找到对应的connection(Client类中维护了一个connection pool),然后调用connection.sendParam()。从前面找到这个函数,你会发现它就是将request写入到socket,发送出去。
  但值得一提的是,它使用的write是最普通的blocking IO,也是同步IO(后面会看到,它读取response也是用的blcoking IO,所以,hadoop RPC虽然是异步机制,但是采用的是同步blocking IO,所以,异步消息机制还采用什么样的IO机制是没有关系的)。
  接下来,调用了call.wait(),将线程阻塞在这里。直到在某个地方调用了call.notify(),它才重新运行起来,然后一通判断后返回call.value,即接收到的response。
  所以,剩下的问题是,到底是哪调用了call.notify()?
  回到connection的receiveResponse函数:
  首先,它从socket的输入流中读到一个id,然后根据这个id找到对应的call,调用call.setValue将从socket中读取的response放入到call的value中,然后调用calls.remove(id)将这个call从队列中移除。这里要注意的是call.setValue,这个函数将value设置好之后,调用了call.notify()!
  好了,让我们再重头将流程捋一遍:
  这里其实有两个线程,一个线程是调用Client.call(),希望向远程server发送请求的线程,另外一个线程就是connection对应的那个线程。当然,虽然有两个线程,但server对应的只有一个socket。第一个线程创建call,然后调用call.sendParam将request通过这个socket发送出去;而第二个线程不断的从socket中读取response。因此,request的发送和response的接收被分隔到不同的线程中执行,而且这两个线程之间关于socket的读写并没有任何的同步机制,因此我认为这个RPC是异步消息机制实现的,只不过通过call.wait()/call.notify()使得对外的接口看上去像是同步。
  好了,Hadoop的RPC介绍完了(虽然我略掉了很多内容,比如timeout机制我这里就没写),说说我个人的评价吧。我认为,Hadoop的这个设计还是挺巧妙的,底层采用的是异步机制,但对外的接口提供的又是一般人比较习惯的同步方式。但是,我觉着缺点不是没有,一个问题是一个链接就要产生一个线程,这个如果是在几千台的cluster中,仍然会带来巨大的线程context switch的开销;另一个问题是对于同一个remote server只有一个socket来进行数据的发送和接收,这样的设计网络的吞吐量很有可能上不去。(一家之言,欢迎指正)
  未完待续~

Similar Posts:

  • Android 线程消息机制深入分析

    Android 线程消息机制深入分析 Android线程消息机制是本文所要讨论的内容,在此之前我们需要先简单介绍下(之后会详细说明)线程消息机制中的四个主要成员,它们分别是Looper.Handler.Message和MessageQueue: Looper是消息循环处理器,它负责从MessageQueue(消息队列)中提取一个Message对象进行处理. Handler是消息发送者,它负责将Message发送到MessageQueue中等候被处理. Message是消息载体,其内部保存了由我们

  • Android消息机制(基于源码解析)

    1. 消息机制概述 Android中的消息机制主要指的是Handler的运行机制,Handler的运行需要底层的MessageQueue和Looper.Message的支撑,下文会逐一分析. 2. 为什么需要消息机制 Android中的消息机制主要是为了满足线程间通信而设计的,最重要的应用场景应该在于更新UI Android规定访问UI只能在主线程中进行,如果在子线程中访问UI,那么程序就会抛出异常 系统为什么不允许在自线程中访问UI呢?这是因为Android的UI控件不是线程安全的,如果在多线

  • 十、Android的消息机制

    从开发的角度来说,Handler是Android消息机制的上层接口,这使得在开发过程中只需要和Handler交互即可. Handler的使用过程很简单,通过它可以轻松地将一个任务切换到Handler所在的线程中去执行. Android的消息机制主要指Handler的运行机制,Handler的运行需要底层的MessageQueue和Looper的支撑. MessageQueue的中文翻译是消息队列,,它的内部存储了一组消息,以队列的形式对外提供插入和删除的工作.虽然叫消息队列,但是它的内部存储结构

  • Android开发——Android的消息机制详解

    1. 我们为什么需要Android的消息机制 我们知道,Android规定访问UI只能在主线程中进行.若在子线程中访问UI,就会抛出异常.这个验证由ViewRootImpl的checkThread方法来完成. 为什么不允许在非主线程访问UI呢,这是因为Android的UI控件不是线程安全的.并发访问会导致控件处于不可预期的状态. 那为什么不对UI访问加上锁机制呢,原因如下: (1)这显然会让UI访问的逻辑变得极其复杂: (2)除了效率问题,锁机制还会阻塞某些进程的执行. 但是Android又不建

  • Android开发Handler消息机制深入探究

    提到Handler,大家不免会想到更新主界面或者延时操作.其实handler在安卓开发中扮演的主要角色就是随时更新主界面的.为什么要采用handler来更新主界面呢,这里又要谈到主线程的不安全原则. 我们知道一个应用启动时系统会为其创建一个进程,而每一Activity启动的时候又会形成一个线程,这个线程叫主线程.安卓的主线程是不安全的,因为从主线程中可以创建多个子线程来分配任务,一个activity的所有view都是唯一的,都有唯一的标识,如果在每个子线程中更新view,我们不能预知线程执行结果

  • 拨云见日---android异步消息机制源码分析

    做过windows GUI的同学应该清楚,一般的GUI操作都是基于消息机制的,应用程序维护一个消息队列,开发人员编写对应事件的回调函数就能实现我们想要的操作 其实android系统也和windows GUI一样,也是基于消息机制,今天让我们通过源码来揭开android消息机制的神秘面纱 谈起异步消息,就不能不提及Handler,在安卓中,由于主线程中不能做耗时操作,所以耗时操作必须让子线程执行,而且只能在主线程(即UI线程)中执行UI更新操作,通过Handler发送异步消息,我们就能更新UI,一

  • [置顶] Handler异步消息机制实例解析

    用几个简短的例子将Handler相关的知识点整理下,方便以后查阅也方便大家快速掌握本知识点. 名词解释: Message 是线程之间传递的消息,它可以在内部携带少量信息,用于在不同线程之间交换数据. MessageQueue 是消息队列,它主要用于存放所有由 Handler发送过来的消息,这部分消息会一直在消息队列中,等待被处理. Handler 即处理者,它主要用于发送和处理消息.发送消息一般使用 handler 的 sendMessage(),处理消息会调用 handleMessage().

  • WPF的消息机制(二)- WPF内部的5个窗口之隐藏消息窗口

    目录 WPF的消息机制(一)-让应用程序动起来 WPF的消息机制(二)-WPF内部的5个窗口 (1)隐藏消息窗口 (2)处理激活和关闭的消息的窗口和系统资源通知窗口 (3)用于用户交互的可见窗口 (4)用于UI窗口绘制的可见窗口 WPF的消息机制(三)-WPF输入事件的来源 WPF的消息机制(四)-WPF中UI的更新 WPF内部的5个窗口 对于Windows系统来说,它是一个消息系统,消息系统的核心就是窗口.对于WPF来说也是如此.那么WPF内部为什么需要窗口,又存在哪些窗口呢? 在上一篇,我们

  • 【风】的ACE笔记(6) Proactor机制下的异步SOCKET开发

    ACE笔记(6) Proactor机制下的异步SOCKET开发 Proactor机制和reactor机制的不同 1.在reactor机制下,所有I/O请求是同步的,即接到信号请求后,立即执行信号处理, 执行完后才开始继续监听信号请求,其接收信号请求的机制是被动的 而在Proactor机制下,I/O请求是异步的,即接到信号请求后,不立即执行信号处理(而是在莫个时刻执行该处理), 然后再继续监听信号请求,其接收信号请求的机制是主动的 2.要想符合Proactor机制的信号处理,需要从 ACE_Ser

  • NSObject头文件解析 - 消息机制 - Runtime解读 (二)

    本章接着NSObject头文件解析 / 消息机制 / Runtime解读(一)写 给类添加属性: BOOL class_addProperty(Class cls, const char *name, const objc_property_attribute_t *attributes, unsigned int attributeCount) 其中有一个参数我们再在上一篇中提到过 typedef struct { const char *name;           /**< The na

Tags: