append-other-scenarios
that has some failing tests for other edge cases. Going to fix that up shortly.
[Test, Category("LongRunning")]
public void be_return_empty_slice_with_position_if_asked_to_read_from_the_last_position()
{
var all = new List<RecordedEvent>();
var position = Position.Start;
AllEventsSlice slice;
while (!(slice = _conn.ReadAllEventsForwardAsync(position, 1, false).Result).IsEndOfStream)
{
all.Add(slice.Events.Single().Event);
position = slice.NextPosition;
}
Assert.That(EventDataComparer.Equal(_testEvents, all.Skip(all.Count - _testEvents.Length).ToArray()));
slice = _conn.ReadAllEventsForwardAsync(position, 1, false).Result;
Assert.That(slice.IsEndOfStream);
Assert.That(slice.Events.Length == 0);
Assert.That(slice.NextPosition == slice.FromPosition);
}
append-other-scenarios
try
{
var checkpointValue = (await dbConnection.QueryAsync<string>("SELECT checkpoint FROM projections.checkpoint;")).FirstOrDefault();
var checkpoint = checkpointValue != null ? new Checkpoint(checkpointValue) : Checkpoint.Start;
var events = await eventStore.ReadAll(checkpoint, 512);
foreach (var streamEvent in events.StreamEvents)
{
//dispatch events
var eventType = Type.GetType(streamEvent.Type, true, true);
var body = JsonSerializer.DeserializeFromString(streamEvent.JsonData, eventType) as IDomainEvent;
var headers = JsonSerializer.DeserializeFromString<IDictionary>(streamEvent.JsonMetadata);
var envelope = EventEnvelope.Create(body);
envelope.CopyHeaders(headers);
messageBus.Publish(envelope);
}
var nextCheckpoint = events.IsEnd ? Checkpoint.End : events.NextCheckpoint;
await dbConnection.ExecuteAsync("UPDATE projections.checkpoint SET checkpoint = @checkpoint;", new { checkpoint = nextCheckpoint.ToString() });
tx.Commit();
}
var nextCheckpoint = events.IsEnd ? Checkpoint.End : events.NextCheckpoint;
<-- this is completely wrong [Fact]
public async Task When_read_past_end_of_all()
{
using(var fixture = GetFixture())
{
using(var eventStore = await fixture.GetEventStore())
{
await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3));
bool isEnd = false;
int count = 0;
Checkpoint checkpoint = Checkpoint.Start;
while (!isEnd)
{
_testOutputHelper.WriteLine($"Loop {count}");
var streamEventsPage = await eventStore.ReadAll(checkpoint, 10);
_testOutputHelper.WriteLine($"FromCheckpoint = {streamEventsPage.FromCheckpoint}");
_testOutputHelper.WriteLine($"NextCheckpoint = {streamEventsPage.NextCheckpoint}");
_testOutputHelper.WriteLine($"IsEnd = {streamEventsPage.IsEnd}");
_testOutputHelper.WriteLine($"StreamEvents.Count = {streamEventsPage.StreamEvents.Count}");
_testOutputHelper.WriteLine("");
checkpoint = streamEventsPage.NextCheckpoint;
isEnd = streamEventsPage.IsEnd;
count++;
if(count > 100)
{
throw new Exception("omg wtf");
}
}
}
}
}
Loop 0
FromCheckpoint = 0/0
NextCheckpoint = 1624/1624
IsEnd = False
StreamEvents.Count = 3
Loop 1
FromCheckpoint = 1624/1624
NextCheckpoint = 2362/2362
IsEnd = False
StreamEvents.Count = 0
Loop 2
FromCheckpoint = 2362/2362
NextCheckpoint = 2362/2362
IsEnd = True
StreamEvents.Count = 0
while (!allEventsPage.IsEnd)
{
allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
}
try
{
var checkpointValue = (await dbConnection.QueryAsync<string>("SELECT checkpoint FROM projections.checkpoint;")).FirstOrDefault();
var checkpoint = checkpointValue != null ? new Checkpoint(checkpointValue) : Checkpoint.Start;
var events = await eventStore.ReadAll(checkpoint, 512);
foreach (var streamEvent in events.StreamEvents)
{
//dispatch events
var eventType = Type.GetType(streamEvent.Type, true, true);
var body = JsonSerializer.DeserializeFromString(streamEvent.JsonData, eventType) as IDomainEvent;
var headers = JsonSerializer.DeserializeFromString<IDictionary>(streamEvent.JsonMetadata);
var envelope = EventEnvelope.Create(body);
envelope.CopyHeaders(headers);
messageBus.Publish(envelope);
}
await dbConnection.ExecuteAsync("UPDATE projections.checkpoint SET checkpoint = @checkpoint;", new { checkpoint = events.NextCheckpoint });
tx.Commit();
}
[Fact]
public async Task Read_forwards_to_the_end_should_return_a_valid_Checkpoint()
{
using (var fixture = GetFixture())
{
using (var eventStore = await fixture.GetEventStore())
{
await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3, 4, 5, 6));
// read to the end of the stream
var allEventsPage = await eventStore.ReadAll(Checkpoint.Start, 4);
while (!allEventsPage.IsEnd)
{
allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
}
allEventsPage.IsEnd.Should().BeTrue();
Checkpoint currentCheckpoint = allEventsPage.NextCheckpoint;
currentCheckpoint.Should().NotBeNull();
// read end of stream again, should be empty, should return same checkpoint
allEventsPage = await eventStore.ReadAll(currentCheckpoint, 10);
while (!allEventsPage.IsEnd)
{
allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
}
allEventsPage.StreamEvents.Should().BeEmpty();
allEventsPage.IsEnd.Should().BeTrue();
allEventsPage.NextCheckpoint.Should().NotBeNull();
currentCheckpoint = allEventsPage.NextCheckpoint;
// append some events then read again from the saved checkpoint, the next checkpoint should have moved
await eventStore.AppendToStream("stream-1", ExpectedVersion.Any, CreateNewStreamEvents(7, 8, 9));
allEventsPage = await eventStore.ReadAll(currentCheckpoint, 10);
while (!allEventsPage.IsEnd)
{
allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
}
allEventsPage.IsEnd.Should().BeTrue();
allEventsPage.NextCheckpoint.Should().NotBeNull();
allEventsPage.NextCheckpoint.Should().NotBe(currentCheckpoint.Value);
}
}
}
while (!allEventsPage.IsEnd)
[Fact]
public async Task Read_forwards_to_the_end_should_return_a_valid_Checkpoint()
{
using (var fixture = GetFixture())
{
using (var eventStore = await fixture.GetEventStore())
{
await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3, 4, 5, 6));
// read to the end of the stream
var allEventsPage = await eventStore.ReadAll(Checkpoint.Start, 4);
int count = 0; //counter is used to short circuit bad implementations that never return IsEnd = true
while (!allEventsPage.IsEnd && count < 20)
{
allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
count++;
}
allEventsPage.IsEnd.Should().BeTrue();
Checkpoint currentCheckpoint = allEventsPage.NextCheckpoint;
currentCheckpoint.Should().NotBeNull();
// read end of stream again, should be empty, should return same checkpoint
allEventsPage = await eventStore.ReadAll(currentCheckpoint, 10);
count = 0;
while (!allEventsPage.IsEnd && count < 20)
{
allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
count++;
}
allEventsPage.StreamEvents.Should().BeEmpty();
allEventsPage.IsEnd.Should().BeTrue();
allEventsPage.NextCheckpoint.Should().NotBeNull();
currentCheckpoint = allEventsPage.NextCheckpoint;
// append some events then read again from the saved checkpoint, the next checkpoint should have moved
await eventStore.AppendToStream("stream-1", ExpectedVersion.Any, CreateNewStreamEvents(7, 8, 9));
allEventsPage = await eventStore.ReadAll(currentCheckpoint, 10);
count = 0;
while (!allEventsPage.IsEnd && count < 20)
{
allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
count++;
}
allEventsPage.IsEnd.Should().BeTrue();
allEventsPage.NextCheckpoint.Should().NotBeNull();
allEventsPage.NextCheckpoint.Should().NotBe(currentCheckpoint.Value);
}
}
}
namespace Cedar.EventStore
{
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
public abstract partial class EventStoreAcceptanceTests
{
[Fact]
public async Task Can_read_to_end_of_allstream()
{
using(var fixture = GetFixture())
{
using(var eventStore = await fixture.GetEventStore())
{
await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3));
var lastAllEventPage = await ReadAllStreamToEnd(eventStore);
lastAllEventPage.StreamEvents.Should().BeEmpty();
lastAllEventPage.IsEnd.Should().BeTrue();
lastAllEventPage.NextCheckpoint.Should().Be(lastAllEventPage.FromCheckpoint);
}
}
}
[Fact]
public async Task When_read_to_end_of_allstream_then_append_event_then_can_read_to_end_of_allstream()
{
using (var fixture = GetFixture())
{
using (var eventStore = await fixture.GetEventStore())
{
await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3));
await ReadAllStreamToEnd(eventStore);
await eventStore.AppendToStream("stream-1", 2, CreateNewStreamEvents(1, 2, 3));
var lastAllEventPage = await ReadAllStreamToEnd(eventStore);
lastAllEventPage.StreamEvents.Should().BeEmpty();
lastAllEventPage.IsEnd.Should().BeTrue();
lastAllEventPage.NextCheckpoint.Should().Be(lastAllEventPage.NextCheckpoint);
}
}
}
private async Task<AllEventsPage> ReadAllStreamToEnd(IEventStore eventStore)
{
int pageSize = 4, count = 0;
var allEventsPage = await eventStore.ReadAll(Checkpoint.Start, pageSize);
LogAllEventsPage(allEventsPage);
while (!allEventsPage.IsEnd && count < 10)
{
_testOutputHelper.WriteLine($"Loop {count}");
allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, pageSize);
LogAllEventsPage(allEventsPage);
count++;
}
return allEventsPage;
}
private void LogAllEventsPage(AllEventsPage allEventsPage)
{
_testOutputHelper.WriteLine($"FromCheckpoint = {allEventsPage.FromCheckpoint}");
_testOutputHelper.WriteLine($"NextCheckpoint = {allEventsPage.NextCheckpoint}");
_testOutputHelper.WriteLine($"IsEnd = {allEventsPage.IsEnd}");
_testOutputHelper.WriteLine($"StreamEvents.Count = {allEventsPage.StreamEvents.Count}");
_testOutputHelper.WriteLine("");
}
}
}
"data", "metadata"
to "\"data\"", "\"metadata\""
since postgres checks for valid json on input