Mercurial > hg > graal-compiler
view graal/com.oracle.truffle.ruby.runtime/src/com/oracle/truffle/ruby/runtime/core/RubyFiber.java @ 13514:0fbee3eb71f0
Ruby: import project.
author | Chris Seaton <chris.seaton@oracle.com> |
---|---|
date | Mon, 06 Jan 2014 17:12:09 +0000 |
parents | |
children |
line wrap: on
line source
/* * Copyright (c) 2013 Oracle and/or its affiliates. All rights reserved. This * code is released under a tri EPL/GPL/LGPL license. You can use it, * redistribute it and/or modify it under the terms of the: * * Eclipse Public License version 1.0 * GNU General Public License version 2 * GNU Lesser General Public License version 2.1 */ package com.oracle.truffle.ruby.runtime.core; import java.util.concurrent.*; import com.oracle.truffle.api.nodes.*; import com.oracle.truffle.ruby.runtime.*; import com.oracle.truffle.ruby.runtime.core.array.*; import com.oracle.truffle.ruby.runtime.objects.*; import com.oracle.truffle.ruby.runtime.subsystems.*; /** * Represents the Ruby {@code Fiber} class. The current implementation uses Java threads and message * passing. Note that the relationship between Java threads, Ruby threads and Ruby fibers is * complex. A Java thread might be running a fiber that on difference resumptions is representing * different Ruby threads. Take note of the lock contracts on {@link #waitForResume} and * {@link #resume}. */ public class RubyFiber extends RubyObject { public static class RubyFiberClass extends RubyClass { public RubyFiberClass(RubyClass objectClass) { super(null, objectClass, "Fiber"); } @Override public RubyBasicObject newInstance() { return new RubyFiber(this, getContext().getFiberManager(), getContext().getThreadManager()); } } private interface FiberMessage { } private class FiberResumeMessage implements FiberMessage { private final RubyThread thread; private final RubyFiber sendingFiber; private final Object arg; public FiberResumeMessage(RubyThread thread, RubyFiber sendingFiber, Object arg) { this.thread = thread; this.sendingFiber = sendingFiber; this.arg = arg; } public RubyThread getThread() { return thread; } public RubyFiber getSendingFiber() { return sendingFiber; } public Object getArg() { return arg; } } private class FiberExitMessage implements FiberMessage { } public class FiberExitException extends ControlFlowException { private static final long serialVersionUID = 1522270454305076317L; } private final FiberManager fiberManager; private final ThreadManager threadManager; private BlockingQueue<FiberMessage> messageQueue = new ArrayBlockingQueue<>(1); public RubyFiber lastResumedByFiber = null; public RubyFiber(RubyClass rubyClass, FiberManager fiberManager, ThreadManager threadManager) { super(rubyClass); this.fiberManager = fiberManager; this.threadManager = threadManager; } public void initialize(RubyProc block) { final RubyFiber finalFiber = this; final RubyProc finalBlock = block; new Thread(new Runnable() { @Override public void run() { fiberManager.registerFiber(finalFiber); try { try { final Object arg = finalFiber.waitForResume(); final Object result = finalBlock.call(null, arg); finalFiber.lastResumedByFiber.resume(finalFiber, result); } catch (FiberExitException e) { // Naturally exit the thread on catching this } } finally { fiberManager.unregisterFiber(finalFiber); } } }).start(); } /** * Send the Java thread that represents this fiber to sleep until it recieves a resume or exit * message. On entry, assumes that the GIL is not held. On exit, holding the GIL. */ public Object waitForResume() { FiberMessage message = null; do { try { // TODO(cs) what is a suitable timeout? message = messageQueue.poll(1, TimeUnit.SECONDS); } catch (InterruptedException e) { // Poll again } } while (message == null); if (message instanceof FiberExitMessage) { throw new FiberExitException(); } final FiberResumeMessage resumeMessage = (FiberResumeMessage) message; threadManager.enterGlobalLock(resumeMessage.getThread()); fiberManager.setCurrentFiber(this); lastResumedByFiber = resumeMessage.getSendingFiber(); return resumeMessage.getArg(); } /** * Send a message to a fiber by posting into a message queue. Doesn't explicitly notify the Java * thread (although the queue implementation may) and doesn't wait for the message to be * received. On entry, assumes the the GIL is held. On exit, not holding the GIL. */ public void resume(RubyFiber sendingFiber, Object... args) { Object arg; if (args.length == 0) { arg = NilPlaceholder.INSTANCE; } else if (args.length == 1) { arg = args[0]; } else { arg = RubyArray.specializedFromObjects(getRubyClass().getContext().getCoreLibrary().getArrayClass(), args); } final RubyThread runningThread = threadManager.leaveGlobalLock(); messageQueue.add(new FiberResumeMessage(runningThread, sendingFiber, arg)); } public void shutdown() { messageQueue.add(new FiberExitMessage()); } }