Stream merge: Difference between revisions

From Rosetta Code
Content added Content deleted
m (→‎REXX: check order of Input (and correct previous wrong Input))
(+ Haskell)
Line 1: Line 1:
{{task}}
{{task}}

; 2-stream merge
; 2-stream merge
: Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
: Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
Line 8: Line 7:
: Common algorithm: same as above, but keep buffered items and their source descriptors in a [[heap]].
: Common algorithm: same as above, but keep buffered items and their source descriptors in a [[heap]].


Assuming streams are very big. You must not suck them whole in the memory, but read them as streams.
Assume streams are very big. You must not suck them whole in the memory, but read them as streams.

== Haskell ==

There is no built-in iterator or stream type for file operations in Haskell. But several such libraries exist.

=== pipes ===

<lang haskell>-- stack runhaskell --package=heap --package=pipes-safe

import Data.Either (rights)
import Data.Heap (MinPrioHeap)
import qualified Data.Heap as Heap
import Data.Traversable (for)
import Pipes (Producer, lift, next, runEffect, yield, (>->))
import Pipes.Prelude (stdoutLn)
import Pipes.Safe (runSafeT)
import Pipes.Safe.Prelude (readFile)
import Prelude hiding (readFile)
import System.Environment (getArgs)

merge :: (Ord a, Monad m) => [Producer a m ()] -> Producer a m ()
merge sources = do
firstResults <- for sources $ \source ->
lift $ next source
let sourceHeap = Heap.fromList $ rights firstResults
mergeHeap sourceHeap

mergeHeap :: (Ord a, Monad m) => MinPrioHeap a (Producer a m ()) -> Producer a m ()
mergeHeap sourceHeap =
case Heap.view sourceHeap of
Nothing ->
pure ()
Just ((a, src1), sourceHeap1) -> do
yield a
res <- lift $ next src1
let sourceHeap2 = case res of
Left () -> sourceHeap1
Right (b, src2) -> Heap.insert (b, src2) sourceHeap1
mergeHeap sourceHeap2

main :: IO ()
main = do
sourceFileNames <- getArgs
let sources = map readFile sourceFileNames
runSafeT . runEffect $ merge sources >-> stdoutLn</lang>


== Python ==
== Python ==
Line 23: Line 67:
print(item)</lang>
print(item)</lang>


== Shell ==
== REXX ==

sort --merge source1 source2 sourceN > sink

==REXX==
<lang rexx>/**********************************************************************
<lang rexx>/**********************************************************************
* Merge 1.txt ... n.txt into m.txt
* Merge 1.txt ... n.txt into m.txt
Line 88: Line 128:
500
500
8</pre>
8</pre>

== Shell ==

sort --merge source1 source2 sourceN > sink

Revision as of 13:15, 15 June 2016

Task
Stream merge
You are encouraged to solve this task according to the task description, using any language you may know.
2-stream merge
Read two sorted streams of items from external source (e.g. disk, or network), and write one stream of sorted items to external sink.
Common algorithm: keep 1 buffered item from each source, select minimal of them, write it, fetch another item from that stream from which the written item was.
N-stream merge
The same as above, but reading from N sources.
Common algorithm: same as above, but keep buffered items and their source descriptors in a heap.

Assume streams are very big. You must not suck them whole in the memory, but read them as streams.

Haskell

There is no built-in iterator or stream type for file operations in Haskell. But several such libraries exist.

pipes

<lang haskell>-- stack runhaskell --package=heap --package=pipes-safe

import Data.Either (rights) import Data.Heap (MinPrioHeap) import qualified Data.Heap as Heap import Data.Traversable (for) import Pipes (Producer, lift, next, runEffect, yield, (>->)) import Pipes.Prelude (stdoutLn) import Pipes.Safe (runSafeT) import Pipes.Safe.Prelude (readFile) import Prelude hiding (readFile) import System.Environment (getArgs)

merge :: (Ord a, Monad m) => [Producer a m ()] -> Producer a m () merge sources = do

   firstResults <- for sources $ \source ->
       lift $ next source
   let sourceHeap = Heap.fromList $ rights firstResults
   mergeHeap sourceHeap

mergeHeap :: (Ord a, Monad m) => MinPrioHeap a (Producer a m ()) -> Producer a m () mergeHeap sourceHeap =

   case Heap.view sourceHeap of
       Nothing ->
           pure ()
       Just ((a, src1), sourceHeap1) -> do
           yield a
           res <- lift $ next src1
           let sourceHeap2 = case res of
                   Left () -> sourceHeap1
                   Right (b, src2) -> Heap.insert (b, src2) sourceHeap1
           mergeHeap sourceHeap2

main :: IO () main = do

   sourceFileNames <- getArgs
   let sources = map readFile sourceFileNames
   runSafeT . runEffect $ merge sources >-> stdoutLn</lang>

Python

Built-in function open opens a file for reading and returns a line-by-line iterator (stream) over the file.

There exists a standard library function heapq.merge that takes any number of sorted stream iterators and merges them into one sorted iterator, using a heap.

<lang python>import heapq import sys

sources = sys.argv[1:] for item in heapq.merge(open(source) for source in sources):

   print(item)</lang>

REXX

<lang rexx>/**********************************************************************

  • Merge 1.txt ... n.txt into m.txt
  • 1.txt 2.txt 3.txt 4.txt
  • 1 19 1999 2e3
  • 17 33 2999 3000
  • 8 500 3999
                                                                                                                                            • /

n=4 high='ffff'x p.= Do i=1 To n

 f.i=i'.txt'
 Call get i
 End

Do Forever

 min=high
 Do i=1 To n
   If x.i<<min Then Do    /* avoid numerical comparison */
     imin=i
     min=x.i
     End
   End
 If min<<high Then Do
   Call o x.imin
   Call get imin
   End
 Else Do
   Call lineout oid
   Leave
   End
 End

Exit get: Procedure Expose f. x. high p.

 Parse Arg ii
 If lines(f.ii)=0 Then
   x.ii=high
 Else Do
   x.ii=linein(f.ii)
   If x.ii<<p.ii Then Do
     Say 'Input file' f.ii 'is not sorted ascendingly'
     Say p.ii 'precedes' x.ii
     Exit
     End
   p.ii=x.ii
   End
 Return

o: Say arg(1)

  Return lineout(oid,arg(1))</lang>
Output:
1
17
19
1999
2999
2e3
3000
33
3999
500
8

Shell

sort --merge source1 source2 sourceN > sink