Commit bf932cc4 authored by Emmanuel Bourg's avatar Emmanuel Bourg

New upstream version 1.22

parent 1faa2438
......@@ -24,15 +24,20 @@
<parent>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch-project</artifactId>
<version>1.20</version>
<version>1.22</version>
</parent>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch-example</artifactId>
<version>1.20</version>
<version>1.22</version>
<description>HawtDispatch: Examples</description>
<properties>
<scala-version>2.10.0</scala-version>
<scalatest-version>1.8</scalatest-version>
</properties>
<dependencies>
<dependency>
......@@ -44,12 +49,12 @@
<dependency>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch-scala</artifactId>
<version>1.20</version>
<version>1.22</version>
</dependency>
<dependency>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch-transport</artifactId>
<version>1.20</version>
<version>1.22</version>
</dependency>
<dependency>
<groupId>org.fusesource.hawtbuf</groupId>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (C) 2012 FuseSource, Inc.
http://fusesource.com
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch-project</artifactId>
<version>1.22</version>
</parent>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch-scala-2.11</artifactId>
<version>1.22</version>
<packaging>bundle</packaging>
<description>HawtDispatch: The libdispatch style API for Scala</description>
<properties>
<scala-version>2.11.0</scala-version>
<scalatest-version>2.1.5</scalatest-version>
</properties>
<dependencies>
<dependency>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch</artifactId>
<version>1.22</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala-version}</version>
<scope>provided</scope>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>${scalatest-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<sourceDirectory>../hawtdispatch-scala/src/main/scala</sourceDirectory>
<resources>
<resource>
<directory>../hawtdispatch-scala/src/main/resources</directory>
</resource>
</resources>
<testSourceDirectory>../hawtdispatch-scala/src/test/scala</testSourceDirectory>
<testResources>
<testResource>
<directory>../hawtdispatch-scala/src/test/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<inherited>true</inherited>
<configuration>
<instructions>
<Fragment-Host>${project.groupId}.hawtdispatch</Fragment-Host>
<Export-Package>{local-packages};version=${project.version};-noimport:=true;-split-package:=first</Export-Package>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scaladocClassName>scala.tools.nsc.ScalaDoc</scaladocClassName>
<jvmArgs>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
<args>
<!-- <arg>-optimise</arg> -->
<arg>-deprecation</arg>
</args>
<scalaVersion>${scala-version}</scalaVersion>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
<useSystemClassLoader>false</useSystemClassLoader>
<!--forkMode>pertest</forkMode-->
<childDelegation>false</childDelegation>
<useFile>true</useFile>
<failIfNoTests>false</failIfNoTests>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<scalaVersion>${scala-version}</scalaVersion>
<jvmArgs>
<jvmArg>-Xmx850M</jvmArg>
</jvmArgs>
</configuration>
</plugin>
</plugins>
</reporting>
<profiles>
<!-- helper for the IDEA scala integration, enable this profile in IDEA -->
<profile>
<id>idea</id>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<args>
<arg>-deprecation</arg>
</args>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>release</id>
<build>
<plugins>
<!-- Generate the Scala Docs so that they can be included in the javadoc artifact -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>doc</id>
<phase>process-classes</phase>
<goals>
<goal>doc</goal>
</goals>
<configuration>
<reportOutputDirectory>${project.build.directory}</reportOutputDirectory>
<outputDirectory>apidocs</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
......@@ -24,22 +24,27 @@
<parent>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch-project</artifactId>
<version>1.20</version>
<version>1.22</version>
</parent>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch-scala</artifactId>
<version>1.20</version>
<version>1.22</version>
<packaging>bundle</packaging>
<description>HawtDispatch: The libdispatch style API for Scala</description>
<properties>
<scala-version>2.10.0</scala-version>
<scalatest-version>1.8</scalatest-version>
</properties>
<dependencies>
<dependency>
<groupId>org.fusesource.hawtdispatch</groupId>
<artifactId>hawtdispatch</artifactId>
<version>1.20</version>
<version>1.22</version>
</dependency>
<dependency>
......@@ -110,16 +115,8 @@
<args>
<!-- <arg>-optimise</arg> -->
<arg>-deprecation</arg>
<arg>-P:continuations:enable</arg>
</args>
<scalaVersion>${scala-version}</scalaVersion>
<compilerPlugins>
<compilerPlugin>
<groupId>org.scala-lang.plugins</groupId>
<artifactId>continuations</artifactId>
<version>${scala-version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
</plugin>
<plugin>
......@@ -168,15 +165,7 @@
<configuration>
<args>
<arg>-deprecation</arg>
<arg>-P:continuations:enable</arg>
</args>
<compilerPlugins>
<compilerPlugin>
<groupId>org.scala-lang.plugins</groupId>
<artifactId>continuations</artifactId>
<version>${scala-version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
</plugin>
</plugins>
......
......@@ -19,7 +19,6 @@ package org.fusesource
import org.fusesource.hawtdispatch._
import java.nio.channels.SelectableChannel
import scala.util.continuations._
import java.util.concurrent.{ExecutorService, CountDownLatch, Executor, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.io.Closeable
......@@ -127,32 +126,6 @@ package object hawtdispatch {
rc
}
/**
* Executes the supplied function on this executor. If not called from a
* runnable being exectued in a Dispatch Queue, then is call blocks
* until continuation is executed. Otherwise, the continuation is
* resumed on the original calling dispatch queue once supplied function
* completes.
*/
def ![T](func: =>T): T @suspendable = shift { k: (T=>Unit) =>
val original = getCurrentQueue
if( original==null ) {
k(sync(func))
} else {
apply {
val result = func
original.apply {
k(result)
}
}
}
}
/**
* Same as {@link #future(=>T)} except that the partial function is wrapped in a {@link reset} block.
*/
def !![T](func: =>T @suspendable):Future[T] = reset_future { func }
}
class RichDispatchSource(val actual:DispatchSource) extends Proxy with RichDispatchObject {
......@@ -358,15 +331,4 @@ package object hawtdispatch {
}
}
/**
* resets a CPS block, and returns it's result in a future.
*/
def reset_future[T](func: =>T @suspendable):Future[T] = {
val rc = Future[T]()
reset {
val r = func
rc(r)
}
rc
}
}
/**
* Copyright (C) 2012 FuseSource, Inc.
* http://fusesource.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fusesource.hawtdispatch
import util.continuations._
import java.nio.channels.{SocketChannel, SelectionKey, ServerSocketChannel}
import collection.mutable.ListBuffer
import java.nio.ByteBuffer
import java.net.SocketAddress
import java.io.IOException
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
case class HawtServerSocketChannel(channel: ServerSocketChannel, queue: DispatchQueue = createQueue()) {
channel.configureBlocking(false);
var accept_requests = ListBuffer[(Either[SocketChannel, IOException]) => Unit]()
val accept_source = createSource(channel, SelectionKey.OP_ACCEPT, queue)
accept_source.onEvent {
var source_drained = false
while (!source_drained && !accept_requests.isEmpty) {
val k = accept_requests.head
try {
val socket = channel.accept
if (socket == null) {
source_drained = true
} else {
accept_requests = accept_requests.drop(1)
k(Left(socket))
}
} catch {
case e: IOException =>
k(Right(e))
}
}
if (accept_requests.isEmpty) {
accept_source.suspend
}
}
def accept: Either[SocketChannel, IOException]@suspendable = shift { k:(Either[SocketChannel, IOException]=>Unit) =>
queue {
if( canceled ) {
k(Right(new IOException("canceled")))
} else {
if (accept_requests.isEmpty) {
accept_source.resume
}
accept_requests.append(k)
}
}
}
var canceled = false
var cancel_requests = ListBuffer[ Unit => Unit]()
accept_source.onCancel {
canceled = true
accept_requests.foreach(_(Right(new IOException("canceled"))))
accept_requests.clear
cancel_requests.foreach(_({}))
cancel_requests.clear
}
def cancel: Unit @suspendable = shift { k:(Unit=>Unit) =>
queue {
if( canceled ) {
k({})
} else {
cancel_requests.append(k)
accept_source.cancel
}
}
}
}
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
case class HawtSocketChannel(channel: SocketChannel, queue: DispatchQueue = createQueue()) {
channel.configureBlocking(false);
var connect_request: (Option[IOException])=>Unit = _
var connect_source:DispatchSource = _
def connect(address:SocketAddress): Option[IOException]@suspendable = shift { k:(Option[IOException]=>Unit) =>
queue {
if( canceled ) {
k(Some(new IOException("canceled")))
} else {
try {
if( channel.connect(address) ) {
k(None)
} else {
connect_request = k
connect_source = createSource(channel, SelectionKey.OP_CONNECT, queue)
def check = if( !connect_source.isCanceled ) {
if( connect_source!=null ) {
try {
if(channel.finishConnect) {
connect_source.cancel
connect_source = null
connect_request = null
k(None)
}
} catch {
case e:IOException =>
connect_source.cancel
connect_source = null
connect_request = null
k(Some(e))
}
}
}
connect_source.onEvent {
check
}
connect_source.resume
// TODO: perhaps we should introduce a connect timeout...
// def schedual_check:Unit = queue.after(5,TimeUnit.SECONDS) {
// check
// if( connect_source==null ) {
// schedual_check
// }
// }
// schedual_check
}
} catch {
case e:IOException => k(Some(e))
}
}
}
}
var read_requests = ListBuffer[(ByteBuffer, (Option[IOException]) => Unit)]()
val read_source = createSource(channel, SelectionKey.OP_READ, queue)
read_source.onEvent {
var source_drained = false
while (!source_drained && !read_requests.isEmpty) {
val k = read_requests.head
try {
val remaining = k._1.remaining
val count = channel.read(k._1)
if (count == 0 && remaining > 0 ) {
source_drained = true
} else {
read_requests = read_requests.drop(1)
k._2(None)
}
} catch {
case e: IOException =>
k._2(Some(e))
}
}
if (read_requests.isEmpty) {
read_source.suspend
}
}
def read(buffer:ByteBuffer): Option[IOException]@suspendable = shift { k:(Option[IOException]=>Unit) =>
queue {
if( canceled ) {
k(canceled_exception)
} else {
if (read_requests.isEmpty) {
read_source.resume
}
read_requests.append((buffer,k))
}
}
}
var write_requests = ListBuffer[(ByteBuffer, (Option[IOException]) => Unit)]()
val write_source = createSource(channel, SelectionKey.OP_WRITE, queue)
write_source.onEvent {
var source_drained = false
while (!source_drained && !write_requests.isEmpty) {
val k = write_requests.head
try {
val remaining = k._1.remaining
val count = channel.write(k._1)
if (count == 0 && remaining > 0 ) {
source_drained = true
} else {
if( k._1.remaining==0 ) {
write_requests = write_requests.drop(1)
k._2(None)
}
}
} catch {
case e: IOException =>
k._2(Some(e))
}
}
if (write_requests.isEmpty) {
write_source.suspend
}
}
def write(buffer:ByteBuffer): Option[IOException]@suspendable = shift { k:(Option[IOException]=>Unit) =>
queue {
if( canceled ) {
k(canceled_exception)
} else {
if (write_requests.isEmpty) {
write_source.resume
}
write_requests.append((buffer,k))
}
}
}
var canceled = false
var cancel_requests = ListBuffer[ Unit => Unit]()
read_source.onCancel {
write_source.cancel
}
write_source.onCancel {
canceled = true
if( connect_source!=null ) {
connect_request(canceled_exception)
connect_request = null
connect_source.cancel
connect_source = null
}
read_requests.foreach(_._2(canceled_exception))
read_requests.clear
write_requests.foreach(_._2(canceled_exception))
write_requests.clear
cancel_requests.foreach(_({}))
cancel_requests.clear
}
def canceled_exception=Some(new IOException("canceled"))
def cancel:Unit @suspendable = shift { k:(Unit=>Unit) =>
queue {
if( canceled ) {
k({})
} else {
cancel_requests.append(k)
read_source.cancel
}
}
}
}
\ No newline at end of file
/**
* Copyright (C) 2012 FuseSource, Inc.
* http://fusesource.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fusesource.hawtdispatch
import org.scalatest._
import junit.JUnitRunner
import org.junit.runner.RunWith
import org.scalatest.matchers.ShouldMatchers
import java.util.concurrent.CountDownLatch
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@RunWith(classOf[JUnitRunner])
class ContinuationTest extends FunSuite with ShouldMatchers {
test("Continuation Test") {
object Foo {
var held:Int = 0;
val a = createQueue()
def hold(v:Int) = a ! {
val rc = held
held = v
rc
}
}
object Bar {
var sum:Int = 0;
val b = createQueue()
def apply() = b !! {
val result = Foo.hold(sum+5)