The multi-core buzz is everywhere. Pick up a newspaper and the local electronics mega-store is advertising multi-core desktops and laptops to the consumer. Interesting, but what does it mean to the everyday Java programmer? Maybe nothing. If you live in the application server world writing EJB-based applications your application server does most of the heavy lifting for you. It handles concurrency just fine. But that doesn't cover all applications. Multi-core technology will especially affect applications that must process large amounts of data in a non-transactional (outside of a database context) manner. For this class of applications, the implications of multi-core are huge.
Why? Well first, notice the processing speeds of multi-core processors. They're not getting faster. In fact, they may be slowing down. As manufacturers add more cores to a chip, the processing speed of each core is usually slowed down to prevent overheating. The 80-core chip that Intel demonstrated recently wasn't 80 cores of x86 architecture but a simpler architecture. This may be an industry trend as more and more cores are squeezed onto a chip. The processor architecture may become heterogeneous, with a few full-power "legacy" cores and many specialized cores. The IBM Cell architecture already employs this scheme, with a single PowerPC core at the center and eight SPU cores connected using high-speed interconnects.
One implication you should take away from all of these processor changes: your single-threaded application may actually slow down on a multi-core system. If you need faster runtimes to meet shrinking SLA windows, you 'll have to multithread your applications, now! No problem, right? Java has included the java.util.concurrent package since Java5. This library contains many powerful constructs from which you can build a fully concurrent and scalable application. But, that isn't always easy or straightforward. The java.util.concurrent package is a set of building blocks that you must master and put to good use. There are several good books on this subject. We highly recommend Java Concurrency in Practice by Brian Goetz, for one.
There's a technology that's been around for years called dataflow that can solve the multi-core dilemma. How? This article will go into detail about dataflow, but the gist is this: dataflow provides a functional parallelism paradigm that fits well into multi-core processor architecture. A dataflow instance consists of a directed graph of processing nodes connected with FIFO data queues. This pipelined architecture lets applications be built from small-size reusable components stitched together with queues. The diagram below gives a small example of dataflow graph. Since it's a pipelined architecture, it naturally takes advantage of many processing cores. But more on that later.
First, we'll cover the overall nature and architecture of dataflow technology, specifically focusing on dataflow in software. Then we'll cover the history of dataflow technology, how it was first conceived and has matured and morphed over the years. Then we'll discuss several implementations of dataflow technology that exist in the marketplace today, highlighting a Java implementation. Read on for a peek into a technology that may have been ahead of its time but appears poised for the new multi-core world.
Yet Another Programming Paradigm
You may be asking: why another programming paradigm? First, the languages we have available to us don't directly support the needs of application builders. As mentioned before, the java.util.concurrent package provides most of the constructs needed to build scalable applications. However, these are lower-level building blocks. It can take many months to become familiar with these constructs and apply them right.
Second, the programming frameworks that have traditionally been used to build highly scalable applications have been targeted at the academic and scientific community. Frameworks such as MPI and OpenMP have been used to solve very large complex problems, taking advantage of some of the world's largest computers and computer grids. However, the confluence of the information explosion with the availability of very inexpensive, off-the-shelf hardware has put high-performance computing within reach of even small and medium-sized businesses. These businesses have ever-increasing demands to process more data in shorter time periods. What they don't have is a staff of concurrent programming experts. (See Figure 1)
On the one extreme we have Java and all of the functionality that it provides. The building blocks to create scalable applications are there, but a cost must be paid to tap into this functionality. On the other extreme are frameworks such as MPI and OpenMP. Again, they provide high functionality, but have traditionally been used by the academic and scientific computing world. They are not easy tools to use. Something is needed to bridge this gap to provide high-performance, highly scalable data processing to the business world. Dataflow technology can be one way to bridge that gap.
Dataflow Programming Model
Dataflow is an alternative to the standard von Neumann model of computation. Typically, we think of a program as a series of instructions each executed one after the other by a processor keeping track of its progress with an instruction pointer. In dataflow, on the other hand, channels transmitting data in one direction join computations to one another. Conceptually, you can think of this structure as a directed graph with data channels as edges and processes performing computation on the data as nodes. The processes each operate only when data is available - the data flowing through the network is all that's needed to organize the computation. The immediate advantage is that many of the processes can be operating simultaneously, thus allowing dataflow applications to take advantage of hardware with multiple processor cores. Notice the concurrency happens external to the process; the developer doesn't have to bother with threads, deadlock detection, starvation, or concurrent memory access to build parallelism into his application.
This type of implicit parallelism stands in stark contrast to the concurrency mechanisms of many other programming paradigms. Gone are the locks of concurrent programming in imperative languages like C, which lack composability - two correct snippets of code using locks may not be correct when they're combined. Dataflow, on the other hand, allows composability: as long as the I/O contract is correct, sub-graphs may replace nodes or be spliced between them in the original dataflow network. This facilitates both program correctness, since sub-graphs can be tested as they're constructed then linked together to form larger programs, and code reuse, since commonly used sub-graphs can be copied from one application to the next.
Dataflow process networks bear some relationship to the dataflow variables of declarative programming languages like Oz. A dataflow variable is simply an unbound variable whose value can only be determined by a separate thread operating in parallel. If the dataflow variable is referenced before it's been bound, the referencing thread pauses awaiting the value. Combined with a single-assignment store (variables can be bound once at most), these variables lead to the nice property that it doesn't matter in what order we evaluate simultaneously executing expressions. Likewise, the outcome of a dataflow network is determined uniquely by its input, regardless of the order in which processes fire. Firing order impacts queue sizes and performance, of course, but this can be dealt with elsewhere besides explicitly within the program itself, dramatically simplifying the task of the dataflow programmer. (See Figure 2)
The History of Dataflow
In the early 1970s, many people grew skeptical of the von Neumann architecture's ability to cope with parallelism. The global instruction pointer and memory could both become bottlenecks in concurrent software if it wasn't carefully designed. Dataflow architecture arose as the only compelling competitor. Designed with concurrency in mind, it eliminated the global instruction pointer and memory by organizing the computation based on the flow of data through a network of processes. However, these radically different architectures proved difficult compiler targets for traditional imperative programs. Dataflow programming and languages arose in response to this need.
At this time, Jack B. Dennis developed the static dataflow model and applied it to the design of computer architectures. His model limited nodes to primitive computations, and the edges were seen as representing data dependencies among the various operations occurring in the network - they held only one data token at a time. The work of Gilles Kahn extended this idea in two ways. First, the edges of his dataflow graphs were unbounded first-in, first-out queues, providing for a flexible rate of flow across each node. Second, he allowed each node to be a complete sequential process, which is often called large-grain dataflow. This approach tends to be more effective in creating efficient software since the threads implementing processes are given a larger fraction of work, reducing the amount of time spent switching between them. Further, the model freed the concept of dataflow from defining the language of its processes. Now it was conceivable to implement them in standard programming languages such as C or Java but still have the network of code operate according to dataflow principles.
Though the potential for distinguishing between the process language and the language or mechanism for coordinating these processes was recognized early on, it wasn't until the late 1980s and early 1990s that the idea drew much attention. Thomas M. Parks presented a Kahn process network scheduling policy in his PhD thesis that ensured bounded queues for infinite inputs, making practical implementations realizable. Simultaneously, projects focused on the software engineering aspects of dataflow began showing up. While not technically dataflow because it doesn't obey the dataflow execution model, J. Paul Morrison's flow-based programming explored the reusability of large-grain processes implemented in common programming languages. He applied these ideas to large systems in the banking industry, empirically measuring an improvement in programmer productivity. More modern frameworks combine the engineering benefits of languages such as Java with the dataflow execution model.
Application and Existing Implementations
Now that you have some background in dataflow technology, you're probably starting to see applications for it. The traditional arena for dataflow is signal processing, since that's where it got its start. And dataflow is still used in that area of the industry today. This is especially true in the academic world. A quick search for dataflow on the Web will show many universities with research activity in the area of signal processing using dataflow technology.
Along those lines, the LabVIEW toolset created by National Instruments has an architecture based on dataflow technology (www.ni.com/labview/). The outstanding user experience offered by this toolset lets a user build up a dataflow graph of data collection and data processing nodes very quickly. It appears to have used dataflow concepts as more of a functional paradigm than for performance. However, with the advent of multi-core and the availability of processing power at much more affordable price points, the LabVIEW toolset is poised to provide highly scalable processing due to its use of dataflow architecture.
There are other applications for dataflow technology beyond signal processing. The pipeline nature of dataflow implementations provides a natural fit for data processing applications. Since the data is pipelined in a dataflow architecture, massive amounts of data can be processed in a highly scalable way. This ability implies that dataflow techniques can be applied to many industry problems, including:
One of the hybrid approaches to dataflow implementation has been used to create massively scalable data processing engines. Way back in the 1990s, a start-up company called Torrent created a C++-based dataflow framework named Orchestrate. This framework implemented dataflow techniques and could run across a cluster of homogeneous systems. Several Torrent customers created business intelligence applications using this dataflow framework. Torrent was eventually acquired by Ascential, which was then acquired by IBM in 2005.
Another data processing engine using dataflow architecture has been built with 100% Java technology. This engine, available from Pervasive Software, uses more of the style of flow-based programming (www.pervasivedatarush.com). It's currently available as part of a free public beta program sponsored by Pervasive. See the sidebar for more information.
Conclusion
As the information age and the multi-core wave continue to collide, more pressure is exerted on software developers to provide access to increasingly inexpensive computing horsepower. High-performance computing was once the domain of system experts, government agencies and universities. Now it's in demand by large as well as medium-sized companies with massive amounts of data to process and shrinking time windows. Compute power that used to cost millions is now available in systems that can be ordered on the Web for $20k.
All of this leads to the need for a better programming paradigm: a paradigm that encourages the developer to build highly performing and scalable software without the burden of low-level system knowledge. Dataflow is one such paradigm. Dataflow, being a pipelined architecture, is inherently scalable. And as we've pointed out, dataflow concepts have been around for many years. They've undergone change and growth as the ideas have matured in the academic community.
There are also several commercial implementations of dataflow in the marketplace, a sign that dataflow technology is real and has many benefits to bring to the software development community. We encourage you to investigate dataflow concepts and determine how they can fit into your software architectures going forward.
Resources
• Johnston, Hanna, and Millar. "Advances in Dataflow Programming Languages." 2004.
http://portal.acm.org/citation.cfm?id=1013208.1013209
• Najjar, Lee, and Gao. "Advances in the Dataflow Computational Model." 1999.
http://ptolemy.eecs.berkeley.edu/publications/papers/99/dataflow/
• Parks. "Bounded Scheduling of Process Networks." 1995.
http://ptolemy.eecs.berkeley.edu/publications/papers/95/parksThesis/
• Harris, Marlow, Jones, and Herlihy. "Composable Memory Transactions." 2006.
http://research.microsoft.com/~simonpj/papers/stm/#composble
• Van Roy and Haridi. "Concepts, Techniques, and Models of Computer Programming." 2004.
WWW.INFO.UCL.AC.BE/~PVR/BOOK.HTML
SIDEBAR
Pervasive DataRush: A Dataflow Implementation
The article on dataflow concepts introduced you to dataflow technology and its relevance to multi-core processing. Here we'll discuss an implementation of dataflow technology built with Java. The application framework is called Pervasive DataRush and is currently in beta release from Pervasive Software.
DataRush is an application development framework. Its purpose is to enable the user to build data processing applications that can easily take advantage of multi-core processors to produce highly scalable software. DataRush implements many dataflow concepts and extends some of these concepts to provide several ways to dramatically increase scalability of applications.
DataRush implements a scripting language, based on XML that provides the means to create reusable components and data processing applications. This language, called DFXML (dataflow XML), is simple in syntax and very flexible. DFXML is used to compose what are called assemblies. An assembly is a composite operator and can be composed of other assemblies, processes, and customizers. A process is the lowest level of operator. It's written in Java and performs the work in the dataflow graph. A customizer is a compiler helper also written in Java that enables the dynamic nature of the DataRush framework. DataRush includes a component library that consists of more than 50 pre-built, ready-to-use components provided by the framework. Included are components that provide connectivity to data and data processing components such as sort, join, merge, lookup, group, and so on.
The architecture block diagram in Figure 1 depicts the high-level architecture of the DataRush framework.
The user utilizes an IDE such as Eclipse to create DFXML assemblies and Java processes and customizers (www.eclipse.org). The DataRush assembler is used to convert DFXML scripts into a binary form. The DataRush execution engine is then invoked to compile the binary files into a dataflow graph for execution. This compilation step is run for every engine instantiation, providing a very dynamic graph-generation capability. Once the dataflow graph is generated, the DataRush engine creates threads and dataflow queues representing the graph and executes the graph. Execution monitoring is provided via JMX.
Dataflow Implementation
The Pervasive DataRush framework implements many of the basic structures of dataflow. Processing nodes (processes in DataRush) are built in Java and interface using dataflow queues. The dataflow queues in DataRush are typed and support native Java types besides string, date, timestamp, and binary.
The dataflow queues in DataRush are somewhat comparable in functionality to the blocking queue implementations in the java.util.concurrent package introduced in the Java 5 release. They're both memory-based queues that block readers on empty queues and block writers of full queues. The DataRush queues, however, must support deadlock detection and handling. Due to support for multiple queue readers and the fact that processes can have multiple inputs and outputs, cycles of dependencies can be created in a dataflow graph. These cycles can lead to deadlock, whereby writers and readers are waiting in a way that needs intervention for the graph to continue working. A deadlock algorithm in the DataRush engine detects deadlock situations and handles it, normally by temporarily expanding the size of the problematic queue.
Besides the pipeline scalability that a dataflow architecture already provides, the Pervasive DataRush framework has built-in support for two other types of scalability: horizontal partitioning and vertical partitioning. Horizontal partitioning replicates a section of dataflow logic and segments the input data into chunks, flowing the data concurrently through the replicated dataflow sections. Figure 2 depicts this scenario using a lookup component as an example. In this example, the lookup operator is replicated with a data partitioner spreading the data load evenly to each lookup instance. This lets each lookup operator run in parallel, fully utilizing multiple cores on the system. Vertical partitioning supports running different dataflow logic in parallel on each field of an input stream. Figure 1 shows the high-level architecture of the Pervasive DataRush framework including design and execution components. The user utilizes an IDE such as Eclipse to create DFXML assemblies and Java processes and customizers. Figure 2 exemplifies horizontal partitioning, one of three types of scalability, which can be implemented using Pervasive DataRush. Horizontal partitioning replicates a section of dataflow logic and segments the input data into chunks, flowing the data concurrently through the replicated dataflow sections.
Why Java?
As the article on dataflow points out, there have been many instantiations of dataflow technology over the years. Most of them have been implemented in C or C++. This makes sense due to the prevalence of C and C++ when the systems were built. When DataRush was first being developed, the decision was made to use Java as the programming language. This decision was based on several factors: portability, flexibility, extensibility, and scalability - and you can throw in productivity for good measure. The decision was also based on the high level of industry investment in JVM technology. Over the past few years, we've seen significant performance improvements with each JDK release. Also, the amount of open source libraries available is astounding. With such a rich environment, the decision has proved to be a good one.
The question always arises about Java and performance. What we've found, with the introduction of the java.nio package and other JVM performance enhancements, is that native speeds can be obtained from Java. This is especially true for frameworks like DataRush in which a static set of classes (the process nodes) are utilized over a relatively long period of time. This scenario provides an environment well suited for JIT compilers.
A Simple Benchmark
To demonstrate the scalability of the DataRush framework, we developed a simple benchmark implementing a one-pass K-means algorithm. The algorithm takes two double-typed values as points and clusters the points into like groups. The benchmark measures the performance of running K-means on 100 input columns over 10 million rows of data. For this particular test, the input data is generated. As can be seen from Figure 3, the performance of the benchmark test improves as more CPU resources are made available. These benchmark results of a K-means test run on an 8-core machine demonstrate how a non-parallelized application fails to scale as more compute resources are added. A snapshot of the CPU utilization is also provided, showing that the DataRush framework was able to keep the machine heavily utilized for the duration of the test. Figure 4 shows CPU usage during the K-means benchmark, the Pervasive DataRush platform has scaled to take full advantage of all 8 cores available on the machine used for this test.
Conclusion
The DataRush application development framework implements dataflow concepts that enable Java programmers to create highly scalable applications that can process many million rows of data. The framework is currently in beta release and can be downloaded at www.pervasivedatarush.com. DataRush is built completely in Java and so is easy to install and begin using right away. A user interface in the Eclipse IDE is being developed, so please check back with the site periodically for updates on that development. The site also includes more information on DataRush and forums for discussion and questions.