I want to write a Haskell function that receives a vector and returns another one with the same size but delayed by a certain number of samples d
(add zeroes at the start). Because the produced vector must have the same size, it must buffer the remaining samples so the next application of this function from C continues on with the previously buffered samples. To do so I implemented a ring-buffer concept and hooked it up to C through FFI.
delay :: Int -> V.Vector Float -> V.Vector Float
delay d inp = unsafePerformIO $ do
ini <- MV.replicate d 0.0
buf <- newIORef ini
inx <- newIORef 0
let f v = unsafePerformIO $ do
i <- readIORef inx
b <- readIORef buf
r <- MV.unsafeExchange b i v
writeIORef buf b
writeIORef inx ((i+1) `mod` d)
return r
return $ V.map f inp
Note here there are two types of vectors, Data.StorableVectoras V
(not to be confused with Data.Vector.Storable
), and Data.Vector.Storable.Mutable as MV
for the ring-buffer.
type Process = (V.Vector Float -> V.Vector Float)
foreign export ccall
startCtx :: IO(StablePtr Process)
startCtx = newStablePtr $ delay 2
foreign export ccall
freeCtx :: StablePtr Process -> IO()
freeCtx = freeStablePtr
foreign export ccall
hs_process :: StablePtr Process -> Int -> Ptr Float -> Ptr Float -> IO()
hs_process pf ns i o = do
f <- deRefStablePtr pf
iv <- V.peek ns i
V.poke o $ f iv
On the C side:
#include "Process_stub.h"
#include <vector>
using namespace std;
extern "C" {
void HsStart();
void HsEnd();
}
vector<float> input1 = {1.0, 2.0, 3.0, 4.0, 5.0},
input2 = {6.0, 7.0, 8.0, 9.0, 10.0},
output(input1.size(), 0.0);
int main(int argc, char *argv[])
{
HsStart();
auto pf = startCtx();
hs_process(pf, input1.size(), input1.data(), output.data());
for(int i = 0; i < input1.size(); i++)
printf("[%d] output = %f\n", i, output[i]);
hs_process(pf, input2.size(), input2.data(), output.data());
for(int i = 0; i < input2.size(); i++)
printf("[%d] output = %f\n", i, output[i]);
freeCtx(pf);
HsEnd();
return 0;
}
What I expect:
First call of hs_process:
[0] input = 1 | output = 0
[1] input = 2 | output = 0
[2] input = 3 | output = 1
[3] input = 4 | output = 2
[4] input = 5 | output = 3
Second call of hs_process:
[0] input = 6 | output = 4
[1] input = 7 | output = 5
[2] input = 8 | output = 6
[3] input = 9 | output = 7
[4] input = 10 | output = 8
But what I get instead:
First call of hs_process:
[0] input = 1 | output = 0
[1] input = 2 | output = 0
[2] input = 3 | output = 1
[3] input = 4 | output = 2
[4] input = 5 | output = 3
Second call of hs_process:
[0] input = 6 | output = 0
[1] input = 7 | output = 0
[2] input = 8 | output = 6
[3] input = 9 | output = 7
[4] input = 10 | output = 8
I can see what I'm doing wrong although I can't explain properly. I'm just keeping the function, not the applicative per se. I would like to be able to keep every (possibly chained) delay call alive by some closure in the StablePtr.
If this is for production code, it makes more sense to write your delay
function in C++.
If this is just to figure out how to do it in Haskell, then understand that this sort of thing must be done in the IO
monad and NOT using unsafe operations. (The reason the operations are unsafe is because using them will result in exactly the sort of weird behavior you're seeing. Function closures won't help here.)
The simplest way to implement delay
in Haskell is to add a parameter to the function representing the "pipe" that's being used. This can be an opaque value to the C program, but on the Haskell side, it can be some direct representation of the sequence of pending, delayed elements.
There's no real reason to use a ring buffer here. You just need a buffer for the "extra" elements, which we can represent as a storable array via Foreign.Array
:
data Pipe = Pipe Int (Ptr Float)
c_new_delay :: Int -> IO (StablePtr Pipe)
c_new_delay d = do
pipe <- Pipe d <$> newArray (replicate d 0)
newStablePtr pipe
Note that allocating a new delay pipe is an IO
operation, so that two invocations of c_new_delay
with the same delay size: c_new_delay 2
followed by c_new_delay 2
can return two different pipes. We can run input through a delay pipe using a function that takes the pipe as its first argument. This, too, needs to be an IO
operation, rather than a pure operation. Otherwise, it wouldn't be possible to pass the same pipe and input to the function in two consecutive calls and generate different output each time.
c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
c_run_delay pipe n input output = do
Pipe d delayed <- deRefStablePtr pipe
copyArray output delayed d
copyArray (advancePtr output d) input (n-d)
copyArray delayed (advancePtr input (n-d)) d
Above, I've just used the copyArray
primitive to copy slices between the input, array of delayed elements, and output. With a final function to free a pipe:
c_free_delay :: StablePtr Pipe -> IO ()
c_free_delay pipe = do
Pipe _ delayed <- deRefStablePtr pipe
free delayed
freeStablePtr pipe
this is more or less a plug-in replacement for your C++ interface. The full code is:
-- Delay.hs
{-# LANGUAGE ForeignFunctionInterface #-}
module Delay where
import Foreign.Ptr
import Foreign.StablePtr
import Foreign.Marshal.Array
import Foreign.Marshal.Alloc
data Pipe = Pipe Int (Ptr Float)
-- |Create a new delay pipe of given size, internally represented
-- as a malloced C-compatible array of delayed items.
c_new_delay :: Int -> IO (StablePtr Pipe)
c_new_delay d = do
pipe <- Pipe d <$> newArray (replicate d 0)
newStablePtr pipe
-- |Feed input to the delay pipe. NOTE: input and output buffers
-- may not overlap.
c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
c_run_delay pipe n input output = do
Pipe d delayed <- deRefStablePtr pipe
copyArray output delayed d
copyArray (advancePtr output d) input (n-d)
copyArray delayed (advancePtr input (n-d)) d
-- |Free a delay pipe.
c_free_delay :: StablePtr Pipe -> IO ()
c_free_delay pipe = do
Pipe _ delayed <- deRefStablePtr pipe
free delayed
freeStablePtr pipe
foreign export ccall "new_delay" c_new_delay :: Int -> IO (StablePtr Pipe)
foreign export ccall "run_delay" c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
foreign export ccall "free_delay" c_free_delay :: StablePtr Pipe -> IO ()
/* delay.cc */
#include "Delay_stub.h"
#include <vector>
#include <cstdio>
using namespace std;
vector<float> input1 = {1.0, 2.0, 3.0, 4.0, 5.0},
input2 = {6.0, 7.0, 8.0, 9.0, 10.0},
output(input1.size(), 0.0);
int main(int argc, char *argv[])
{
hs_init(&argc, &argv);
auto pf = new_delay(2);
run_delay(pf, input1.size(), input1.data(), output.data());
for(int i = 0; i < input1.size(); i++)
printf("[%d] output = %f\n", i, output[i]);
run_delay(pf, input2.size(), input2.data(), output.data());
for(int i = 0; i < input2.size(); i++)
printf("[%d] output = %f\n", i, output[i]);
free_delay(pf);
return 0;
}
and compiling and running with:
$ ghc -no-hs-main -o delay Delay.hs delay.cc && ./delay
produces the expected output:
[1 of 2] Compiling Delay ( Delay.hs, Delay.o ) [Source file changed]
[2 of 2] Linking delay [Objects changed]
[0] output = 0.000000
[1] output = 0.000000
[2] output = 1.000000
[3] output = 2.000000
[4] output = 3.000000
[0] output = 4.000000
[1] output = 5.000000
[2] output = 6.000000
[3] output = 7.000000
[4] output = 8.000000
Note that if you had an existing Haskell delay
function meant to be called from Haskell code, it still wouldn't be written as a pure function. Instead, you'd use something like @chi's suggestion in the IO
monad:
delay :: Int -> IO (Vector Float -> IO (Vector Float))
The justification here is that delay 2
can't return a pure value, or else the code:
pipe1 = delay 2
pipe2 = delay 2
would end up defining pipe1
and pipe2
as the same pipe, and you want these to be independent pipes. By making delay 2
return an IO
value, you can write some do
-notation:
pipe1 <- delay 2
pipe2 <- delay 2
and get the independent pipes you expect. Similarly, pipe1
itself can't be a pure Vector Float -> Vector Float
function, or else running it twice on the same input:
pipe1 [1,2,3,4,5]
pipe1 [1,2,3,4,5]
would produce the same result, and you want different results (first [0,0,1,2,3]
, and then [4,5,1,2,3]
). So, again, by having it return an IO
value, you can write:
result1 <- pipe1 [1,2,3,4,5]
result2 <- pipe1 [1,2,3,4,5]
and reasonably expect to get two different results.
Anyway, if you had an existing Haskell implementation of this form, like the following which uses an IORef
to hold an immutable array to the delayed elements between calls:
{-# LANGUAGE OverloadedLists #-}
module Delay where
import Data.IORef
import qualified Data.Vector.Unboxed as V
delay :: Int -> IO (V.Vector Float -> IO (V.Vector Float))
delay n = do
r <- newIORef (V.replicate n 0)
pure $ \v -> atomicModifyIORef r $ \v_delayed ->
let (v1, v2) = V.splitAt (V.length v - n) v
in (v2, v_delayed V.++ v1)
test :: IO ()
test = do
d <- delay 2
print =<< d [1,2,3,4,5] -- output: [0.0,0.0,1.0,2.0,3.0]
print =<< d [1,2,3,4,5] -- ouptut: [4.0,5.0,1.0,2.0,3.0]
and wanted to expose a C API, you could use a StablePtr
to the Vector Float -> IO (Vector Float)
portion, similar to what you already tried. Since I'm using Vector.Unboxed
, I used slightly different code to marshal the vectors into and out of the C++ buffers.
import Foreign.Ptr
import Foreign.StablePtr
import Foreign.Storable
type Pipe = V.Vector Float -> IO (V.Vector Float)
c_new_delay :: Int -> IO (StablePtr Pipe)
c_new_delay d = delay d >>= newStablePtr
c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
c_run_delay pipe n input output = do
pipe' <- deRefStablePtr pipe
input' <- V.generateM n (peekElemOff input)
output' <- pipe' input'
V.imapM_ (pokeElemOff output) output'
c_free_delay :: StablePtr Pipe -> IO ()
c_free_delay = freeStablePtr
foreign export ccall "new_delay" c_new_delay :: Int -> IO (StablePtr Pipe)
foreign export ccall "run_delay" c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
foreign export ccall "free_delay" c_free_delay :: StablePtr Pipe -> IO ()
This could be substituted for the Delay.hs
module above and would work fine with my posted delay.cc
. The full alternative Delay.hs
module is:
-- Delay.hs, alternative version
{-# LANGUAGE OverloadedLists #-}
module Delay where
import Data.IORef
import qualified Data.Vector.Unboxed as V
import Foreign.Ptr
import Foreign.StablePtr
import Foreign.Storable
delay :: Int -> IO (V.Vector Float -> IO (V.Vector Float))
delay n = do
r <- newIORef (V.replicate n 0)
pure $ \v -> atomicModifyIORef r $ \v_delayed ->
let (v1, v2) = V.splitAt (V.length v - n) v
in (v2, v_delayed V.++ v1)
test :: IO ()
test = do
d <- delay 2
print =<< d [1,2,3,4,5] -- output: [0.0,0.0,1.0,2.0,3.0]
print =<< d [1,2,3,4,5] -- ouptut: [4.0,5.0,1.0,2.0,3.0]
type Pipe = V.Vector Float -> IO (V.Vector Float)
c_new_delay :: Int -> IO (StablePtr Pipe)
c_new_delay d = delay d >>= newStablePtr
c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
c_run_delay pipe n input output = do
pipe' <- deRefStablePtr pipe
input' <- V.generateM n (peekElemOff input)
output' <- pipe' input'
V.imapM_ (pokeElemOff output) output'
c_free_delay :: StablePtr Pipe -> IO ()
c_free_delay = freeStablePtr
foreign export ccall "new_delay" c_new_delay :: Int -> IO (StablePtr Pipe)
foreign export ccall "run_delay" c_run_delay :: StablePtr Pipe -> Int -> Ptr Float -> Ptr Float -> IO ()
foreign export ccall "free_delay" c_free_delay :: StablePtr Pipe -> IO ()