14

This F# code is an attempt to solve Project Euler problem #58:

let inc = function
| n -> n + 1
let is_prime = function
| 2 -> true
| n when n < 2 || n%2=0-> false 
| n -> 
       [3..2..(int (sqrt (float n)))] 
       |> List.tryFind (fun i -> n%i=0)
       |> Option.isNone
let spir = Seq.initInfinite (fun i -> 
    let n = i%4
    let a = 2 * (i/4 + 1)
    (a*n) + a + (a-1)*(a-1))
let rec accum se p n = 
   match se with
   | x when p*10 < n && p <> 0 -> 2*(n/4) + 1
   | x when is_prime (Seq.head x) -> accum (Seq.tail x) (inc p) (inc n)
   | x -> accum (Seq.tail x) p (inc n)
   | _ -> 0
printfn "%d" (accum spir 0 1)

I do not know the running time of this program because I refused to wait for it to finish. Instead, I wrote this code imperatively in C++:

#include "stdafx.h"
#include "math.h"
#include <iostream>

using namespace std;

int is_prime(int n)
{
    if (n % 2 == 0) return 0;
    for (int i = 3; i <= sqrt(n); i+=2)
    {
        if (n%i == 0)
        {
            return 0;
        }
    }
    return 1;
}

int spir(int i)
{
    int n = i % 4;
    int a = 2 * (i / 4 + 1);
    return (a*n) + a + ((a - 1)*(a - 1));
}

int main()
{
    int n = 1, p = 0, i = 0;
    cout << "start" << endl;
    while (p*10 >= n || p == 0)
    {
        p += is_prime(spir(i));
        n++; i++;
    }
    cout << 2*(i/4) + 1;

    return 0;
}

The above code runs in less than 2 seconds and gets the correct answer.

What is making the F# code run so slowly? Even after using some of the profiling tools mentioned in an old Stackoverflow post, I still cannot figure out what expensive operations are happening.

Edit #1

With rmunn's post, I was able to come up with a different implementation that gets the answer in a little under 30 seconds:

let inc = function
| n -> n + 1
let is_prime = function
| 2 -> true
| n when n < 2 || n%2=0-> false 
| n -> 
       [3..2..(int (sqrt (float n)))] 
       |> List.tryFind (fun i -> n%i=0)
       |> Option.isNone
let spir2 = 
    List.unfold (fun state -> 
        let p = fst state
        let i = snd state
        let n = i%4
        let a = 2 * (i/4 + 1)
        let diag = (a*n) + a + (a-1)*(a-1)
        if p*10 < (i+1) && p <> 0 then 
            printfn "%d" (2*((i+1)/4) + 1)
            None
        elif is_prime diag then
            Some(diag, (inc p, inc i))
        else Some(diag, (p, inc i))) (0, 0)

Edit #2

With FuleSnabel's informative post, his is_prime function makes the above code run in under a tenth of a second, making it faster than the C++ code:

let inc = function
| n -> n + 1
let is_prime = function
  | 1                 -> false
  | 2                 -> true
  | v when v % 2 = 0  -> false
  | v ->
    let stop = v |> float |> sqrt |> int
    let rec loop vv =
      if vv <= stop then
        if (v % vv) <> 0 then
          loop (vv + 2)
        else
          false
      else
        true
    loop 3
let spir2 = 
    List.unfold (fun state -> 
        let p = fst state
        let i = snd state
        let n = i%4
        let a = 2 * (i/4 + 1)
        let diag = (a*n) + a + (a-1)*(a-1)
        if p*10 < (i+1) && p <> 0 then 
            printfn "%d" (2*((i+1)/4) + 1)
            None
        elif i <> 3 && is_prime diag then
            Some(diag, (inc p, inc i))
        else Some(diag, (p, inc i))) (0, 0)
Community
  • 1
  • 1
ljeabmreosn
  • 970
  • 1
  • 9
  • 26

2 Answers2

12

There is no Seq.tail function in the core F# library (UPDATE: Yes there is, see comments), so I assume you're using the Seq.tail function from FSharpx.Collections. If you're using a different implementation of Seq.tail, it's probably similar -- and it's almost certainly the cause of your problems, because it's not O(1) like you think it is. Getting the tail of a List is O(1) because of how List is implemented (as a series of cons cells). But getting the tail of a Seq ends up creating a brand new Seq from the original enumerable, discarding one item from it, and returning the rest of its items. When you go through your accum loop a second time, you call Seq.tail on that "skip 1 then return" seq. So now you have a Seq which I'll call S2, which asks S1 for an IEnumerable, skips the first item of S1, and returns the rest of it. S1, when asked for its first item, asks S0 (the original Seq) for an enumerable, skips its first item, then returns the rest of it. So for S2 to skip two items, it had to create two seqs. Now on your next run through when you ask for the Seq.tail of S2, you create S3 that asks S2 for an IEnumerable, which asks S1 for an IEnumerable, which asks S0 for an IEnumerable... and so on. This is effectively O(N^2), when you thought you were writing an O(N) operation.

I'm afraid I don't have time right now to figure out a solution for you; using List.tail won't help since you need an infinite sequence. But perhaps just knowing about the Seq.tail gotcha is enough to get you started, so I'll post this answer now even though it's not complete.

If you need more help, comment on this answer and I'll come back to it when I have time -- but that might not be for several days, so hopefully others will also answer your question.

rmunn
  • 34,942
  • 10
  • 74
  • 105
  • 2
    Thank you for this very informative answer! I'll see if I can make a different implementation. – ljeabmreosn Jun 12 '16 at 16:46
  • 1
    @munn: There _is_ `Seq.tail` in the FSharp core libraries. Source code is [here on GitHub](https://github.com/fsharp/fsharp/blob/37a100b7caafde0f4df5a1924c9f65f4a18277a8/src/fsharp/FSharp.Core/seq.fs#L1794) - it just seems that there is no MSDN documentation for it as of yet. I added an [issue](https://github.com/Microsoft/visualfsharpdocs/issues/180) to the relevant project. – Anton Schwaighofer Jun 13 '16 at 08:27
  • @AntonSchwaighofer - I knew that F# 4 had [normalized the APIs](https://github.com/fsharp/FSharpLangDesign/blob/master/FSharp-4.0/ListSeqArrayAdditions.md) of List, Seq and Array so that all the collection functions were available on all of them, so I should have realized that there was a `Seq.tail` now. It's the same implementation as the one from [FSharpx.Collections](https://github.com/fsprojects/FSharpx.Collections/blob/master/src/FSharpx.Collections/Collections.fs#L119-119), so thankfully my answer was still correct except for that first sentence. – rmunn Jun 15 '16 at 03:22
9

Writing performant F# is very possible but requires some knowledge of patterns that have high relative CPU cost in a tight loop. I recommend using tools like ILSpy to find hidden overhead.

For instance one could imagine F# exands this expression into an effective for loop:

[3..2..(int (sqrt (float n)))] 
|> List.tryFind (fun i -> n%i=0)
|> Option.isNone

However it currently doesn't. Instead it creates a List that spans the range using intrinsic operators and passes that to List.tryFind. This is expensive when compared to the actual work we like to do (the modulus operation). ILSpy decompiles the code above into something like this:

public static bool is_prime(int _arg1)
{
  switch (_arg1)
  {
  case 2:
    return true;
  default:
    return _arg1 >= 2 && _arg1 % 2 != 0 && ListModule.TryFind<int>(new Program.Original.is_prime@10(_arg1), SeqModule.ToList<int>(Operators.CreateSequence<int>(Operators.OperatorIntrinsics.RangeInt32(3, 2, (int)Math.Sqrt((double)_arg1))))) == null;
  }
}

These operators aren't as performant as they could be (AFAIK this is currently being improved) but no matter how effecient allocating a List and then search it won't beat a for loop.

This means the is_prime is not as effective as it could be. Instead one could do something like this:

let is_prime = function
  | 1                 -> false
  | 2                 -> true
  | v when v % 2 = 0  -> false
  | v ->
    let stop = v |> float |> sqrt |> int
    let rec loop vv =
      if vv <= stop then
        (v % vv) <> 0 && loop (vv + 2)
      else
        true
    loop 3

This version of is_prime relies on tail call optimization in F# to expand the loop into an efficient for loop (you can see this using ILSpy). ILSpy decompile the loop into something like this:

while (vv <= stop)
{
  if (_arg1 % vv == 0)
  {
    return false;
  }
  int arg_13_0 = _arg1;
  int arg_11_0 = stop;
  vv += 2;
  stop = arg_11_0;
  _arg1 = arg_13_0;
}

This loop doesn't allocate memory and is just a rather efficient loop. One see some non-sensical assignments but hopefully the JIT:er eliminate those. I am sure is_prime can be improved even further.

When using Seq in performant code one have to keep in mind it's lazy and it doesn't use memoization by default (see Seq.cache). Therefore one might easily end up doing the same work over and over again (see @rmunn answer).

In addition Seq isn't especially effective because of how IEnumerable/IEnumerator are designed. Better options are for instance Nessos Streams (available on nuget).

In case you are interested I did a quick implementation that relies on a simple Push Stream which seems decently performant:

// Receiver<'T> is a callback that receives a value.
//  Returns true if it wants more values, false otherwise.
type Receiver<'T> = 'T -> bool
// Stream<'T> is function that accepts a Receiver<'T>
//  This means Stream<'T> is a push stream (as opposed to Seq that uses pull)
type Stream<'T>   = Receiver<'T> -> unit

// is_prime returns true if the input is prime, false otherwise
let is_prime = function
  | 1                 -> false
  | 2                 -> true
  | v when v % 2 = 0  -> false
  | v ->
    let stop = v |> float |> sqrt |> int
    let rec loop vv =
      if vv <= stop then
        (v % vv) <> 0 && loop (vv + 2)
      else
        true
    loop 3

// tryFind looks for the first value in the input stream for f v = true.
//  If found tryFind returns Some v, None otherwise
let tryFind f (s : Stream<'T>) : 'T option =
  let res = ref None
  s (fun v -> if f v then res := Some v; false else true)
  !res

// diagonals generates a tuple stream of all diagonal values
//  The first value is the side length, the second value is the diagonal value
let diagonals : Stream<int*int> =
  fun r ->
    let rec loop side v =
      let step  = side - 1
      if r (side, v + 1*step) && r (side, v + 2*step) && r (side, v + 3*step) && r (side, v + 4*step) then
        loop (side + 2) (v + 4*step)
    if r (1, 1) then loop 3 1

// ratio computes the streaming ratio for f v = true
let ratio f (s : Stream<'T>) : Stream<float*'T> =
  fun r ->
    let inc r = r := !r + 1.
    let acc   = ref 0.
    let count = ref 0.
    s (fun v -> (inc count; if f v then inc acc); r (!acc/(!count), v))

let result =
  diagonals
  |> ratio (snd >> is_prime)
  |> tryFind (fun (r, (_, v)) -> v > 1 && r < 0.1)
ildjarn
  • 62,044
  • 9
  • 127
  • 211
  • 1
    Thanks for a very in-depth post! I too used ILSpy to see if I could find anything suspiciously expensive. I didn't know much about the performance of IEnumerator. I just put your `is_prime` function in my code and now it runs under a second, faster than the C++ code! If `List.tryFind` does not compile into a simple iterative loop, why would I ever use it? – ljeabmreosn Jun 12 '16 at 18:56
  • `List.tryFind` is fine to use if one already have a list that one needs to search. The problem here is that one generates a list and then searches it. This is rather expensive when compared to the actual work that we like to to (a modulus operation). – Just another metaprogrammer Jun 12 '16 at 19:06
  • 1
    Updated the answer in order to clarify it's the list creation that is the biggest problem. – Just another metaprogrammer Jun 12 '16 at 19:09