mirror of
https://github.com/evopro-ag/Sharp7Reactive.git
synced 2025-12-17 04:02:52 +00:00
Cleanup
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("Sharp7.Rx.Tests")]
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ using System.Collections.Generic;
|
||||
using System.Reactive;
|
||||
using System.Reactive.Linq;
|
||||
using System.Reactive.Subjects;
|
||||
using Sharp7.Rx.Extensions;
|
||||
|
||||
namespace Sharp7.Rx.Basics
|
||||
{
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
|
||||
namespace Sharp7.Rx.Basics
|
||||
{
|
||||
|
||||
@@ -1,21 +1,31 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.ComponentModel;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Reactive;
|
||||
using System.Reactive.Concurrency;
|
||||
using System.Reactive.Disposables;
|
||||
using System.Reactive.Linq;
|
||||
using System.Reactive.Subjects;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Sharp7.Rx.Resources;
|
||||
|
||||
namespace Sharp7.Rx.Extensions
|
||||
{
|
||||
internal static class ObservableExtensions
|
||||
{
|
||||
public static IObservable<T> DisposeMany<T>(this IObservable<T> source)
|
||||
{
|
||||
return Observable.Create<T>(obs =>
|
||||
{
|
||||
var serialDisposable = new SerialDisposable();
|
||||
var subscription =
|
||||
source.Subscribe(
|
||||
item =>
|
||||
{
|
||||
serialDisposable.Disposable = item as IDisposable;
|
||||
obs.OnNext(item);
|
||||
},
|
||||
obs.OnError,
|
||||
obs.OnCompleted);
|
||||
return new CompositeDisposable(serialDisposable, subscription);
|
||||
});
|
||||
}
|
||||
|
||||
public static IObservable<T> LogAndRetry<T>(this IObservable<T> source, ILogger logger, string message)
|
||||
{
|
||||
return source
|
||||
@@ -25,24 +35,6 @@ namespace Sharp7.Rx.Extensions
|
||||
.Retry();
|
||||
}
|
||||
|
||||
public static IObservable<T> RetryAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
TimeSpan retryDelay,
|
||||
int retryCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry);
|
||||
}
|
||||
|
||||
public static IObservable<T> RepeatAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
TimeSpan retryDelay,
|
||||
int repeatCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat);
|
||||
}
|
||||
|
||||
public static IObservable<T> LogAndRetryAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
ILogger logger,
|
||||
@@ -60,6 +52,24 @@ namespace Sharp7.Rx.Extensions
|
||||
return RetryAfterDelay(sourceLogged, retryDelay, retryCount, scheduler);
|
||||
}
|
||||
|
||||
public static IObservable<T> RepeatAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
TimeSpan retryDelay,
|
||||
int repeatCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat);
|
||||
}
|
||||
|
||||
public static IObservable<T> RetryAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
TimeSpan retryDelay,
|
||||
int retryCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry);
|
||||
}
|
||||
|
||||
private static IObservable<T> RedoAfterDelay<T>(IObservable<T> source, TimeSpan retryDelay, int retryCount, IScheduler scheduler, Func<IObservable<T>, IObservable<T>> reDo,
|
||||
Func<IObservable<T>, int, IObservable<T>> reDoCount)
|
||||
{
|
||||
@@ -69,23 +79,5 @@ namespace Sharp7.Rx.Extensions
|
||||
var deferedObs = Observable.Defer(() => ((++attempt == 1) ? source : source.DelaySubscription(retryDelay, scheduler)));
|
||||
return retryCount > 0 ? reDoCount(deferedObs, retryCount) : reDo(deferedObs);
|
||||
}
|
||||
|
||||
public static IObservable<T> DisposeMany<T>(this IObservable<T> source)
|
||||
{
|
||||
return Observable.Create<T>(obs =>
|
||||
{
|
||||
var serialDisposable = new SerialDisposable();
|
||||
var subscription =
|
||||
source.Subscribe(
|
||||
item =>
|
||||
{
|
||||
serialDisposable.Disposable = item as IDisposable;
|
||||
obs.OnNext(item);
|
||||
},
|
||||
obs.OnError,
|
||||
obs.OnCompleted);
|
||||
return new CompositeDisposable(serialDisposable, subscription);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,12 @@
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using JetBrains.Annotations;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Sharp7.Rx.Enums;
|
||||
|
||||
namespace Sharp7.Rx.Interfaces
|
||||
{
|
||||
[NoReorder]
|
||||
public interface IPlc : IDisposable
|
||||
{
|
||||
IObservable<TValue> CreateNotification<TValue>(string variableName, TransmissionMode transmissionMode);
|
||||
|
||||
@@ -2,10 +2,12 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using JetBrains.Annotations;
|
||||
using Sharp7.Rx.Enums;
|
||||
|
||||
namespace Sharp7.Rx.Interfaces
|
||||
{
|
||||
[NoReorder]
|
||||
internal interface IS7Connector : IDisposable
|
||||
{
|
||||
IObservable<ConnectionState> ConnectionState { get; }
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
using Sharp7.Rx.Enums;
|
||||
using JetBrains.Annotations;
|
||||
using Sharp7.Rx.Enums;
|
||||
|
||||
namespace Sharp7.Rx
|
||||
{
|
||||
[NoReorder]
|
||||
internal class S7VariableAddress
|
||||
{
|
||||
public Operand Operand { get; set; }
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.Linq;
|
||||
using System.Text.RegularExpressions;
|
||||
using Sharp7.Rx.Enums;
|
||||
using Sharp7.Rx.Interfaces;
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
{
|
||||
internal class PlcConnectionSettings
|
||||
{
|
||||
public string IpAddress { get; set; }
|
||||
public int RackNumber { get; set; }
|
||||
public int CpuMpiAddress { get; set; }
|
||||
public string IpAddress { get; set; }
|
||||
public int Port { get; set; }
|
||||
public int RackNumber { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0" PrivateAssets="All"/>
|
||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
|
||||
<PackageReference Include="Sharp7" Version="1.1.84" />
|
||||
<PackageReference Include="System.Interactive" Version="6.0.1" />
|
||||
|
||||
@@ -18,47 +18,18 @@ namespace Sharp7.Rx
|
||||
{
|
||||
internal class Sharp7Connector : IS7Connector
|
||||
{
|
||||
private readonly IS7VariableNameParser variableNameParser;
|
||||
private readonly BehaviorSubject<ConnectionState> connectionStateSubject = new BehaviorSubject<ConnectionState>(Enums.ConnectionState.Initial);
|
||||
private readonly int cpuSlotNr;
|
||||
|
||||
private readonly CompositeDisposable disposables = new CompositeDisposable();
|
||||
private readonly LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism:1);
|
||||
private readonly string ipAddress;
|
||||
private readonly int rackNr;
|
||||
private readonly int cpuSlotNr;
|
||||
private readonly int port;
|
||||
|
||||
private S7Client sharp7;
|
||||
private readonly int rackNr;
|
||||
private readonly LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism: 1);
|
||||
private readonly IS7VariableNameParser variableNameParser;
|
||||
private bool disposed;
|
||||
|
||||
public ILogger Logger { get; set; }
|
||||
public async Task<Dictionary<string, byte[]>> ExecuteMultiVarRequest(IReadOnlyList<string> variableNames)
|
||||
{
|
||||
if (variableNames.IsEmpty())
|
||||
return new Dictionary<string, byte[]>();
|
||||
|
||||
var s7MultiVar = new S7MultiVar(sharp7);
|
||||
|
||||
var buffers = variableNames
|
||||
.Select(key => new {VariableName = key, Address = variableNameParser.Parse(key)})
|
||||
.Select(x =>
|
||||
{
|
||||
var buffer = new byte[x.Address.Length];
|
||||
s7MultiVar.Add(S7Consts.S7AreaDB, S7Consts.S7WLByte, x.Address.DbNr, x.Address.Start,x.Address.Length, ref buffer);
|
||||
return new { x.VariableName, Buffer = buffer};
|
||||
})
|
||||
.ToArray();
|
||||
|
||||
var result = await Task.Factory.StartNew(() => s7MultiVar.Read(), CancellationToken.None, TaskCreationOptions.None, scheduler);
|
||||
if (result != 0)
|
||||
{
|
||||
EvaluateErrorCode(result);
|
||||
throw new InvalidOperationException($"Error in MultiVar request for variables: {string.Join(",", variableNames)}");
|
||||
}
|
||||
|
||||
return buffers.ToDictionary(arg => arg.VariableName, arg => arg.Buffer);
|
||||
}
|
||||
|
||||
private S7Client sharp7;
|
||||
|
||||
|
||||
public Sharp7Connector(PlcConnectionSettings settings, IS7VariableNameParser variableNameParser)
|
||||
@@ -72,8 +43,14 @@ namespace Sharp7.Rx
|
||||
ReconnectDelay = TimeSpan.FromSeconds(5);
|
||||
}
|
||||
|
||||
public IObservable<ConnectionState> ConnectionState => connectionStateSubject.DistinctUntilChanged().AsObservable();
|
||||
|
||||
public ILogger Logger { get; set; }
|
||||
|
||||
public TimeSpan ReconnectDelay { get; set; }
|
||||
|
||||
private bool IsConnected => connectionStateSubject.Value == Enums.ConnectionState.Connected;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
@@ -103,8 +80,6 @@ namespace Sharp7.Rx
|
||||
return false;
|
||||
}
|
||||
|
||||
public IObservable<ConnectionState> ConnectionState => connectionStateSubject.DistinctUntilChanged().AsObservable();
|
||||
|
||||
|
||||
public async Task Disconnect()
|
||||
{
|
||||
@@ -112,6 +87,33 @@ namespace Sharp7.Rx
|
||||
await CloseConnection();
|
||||
}
|
||||
|
||||
public async Task<Dictionary<string, byte[]>> ExecuteMultiVarRequest(IReadOnlyList<string> variableNames)
|
||||
{
|
||||
if (variableNames.IsEmpty())
|
||||
return new Dictionary<string, byte[]>();
|
||||
|
||||
var s7MultiVar = new S7MultiVar(sharp7);
|
||||
|
||||
var buffers = variableNames
|
||||
.Select(key => new {VariableName = key, Address = variableNameParser.Parse(key)})
|
||||
.Select(x =>
|
||||
{
|
||||
var buffer = new byte[x.Address.Length];
|
||||
s7MultiVar.Add(S7Consts.S7AreaDB, S7Consts.S7WLByte, x.Address.DbNr, x.Address.Start, x.Address.Length, ref buffer);
|
||||
return new {x.VariableName, Buffer = buffer};
|
||||
})
|
||||
.ToArray();
|
||||
|
||||
var result = await Task.Factory.StartNew(() => s7MultiVar.Read(), CancellationToken.None, TaskCreationOptions.None, scheduler);
|
||||
if (result != 0)
|
||||
{
|
||||
EvaluateErrorCode(result);
|
||||
throw new InvalidOperationException($"Error in MultiVar request for variables: {string.Join(",", variableNames)}");
|
||||
}
|
||||
|
||||
return buffers.ToDictionary(arg => arg.VariableName, arg => arg.Buffer);
|
||||
}
|
||||
|
||||
public Task InitializeAsync()
|
||||
{
|
||||
try
|
||||
@@ -138,6 +140,64 @@ namespace Sharp7.Rx
|
||||
return Task.FromResult(true);
|
||||
}
|
||||
|
||||
public async Task<byte[]> ReadBytes(Operand operand, ushort startByteAddress, ushort bytesToRead, ushort dBNr, CancellationToken token)
|
||||
{
|
||||
EnsureConnectionValid();
|
||||
|
||||
var buffer = new byte[bytesToRead];
|
||||
|
||||
var area = FromOperand(operand);
|
||||
|
||||
var result =
|
||||
await Task.Factory.StartNew(() => sharp7.ReadArea(area, dBNr, startByteAddress, bytesToRead, S7Consts.S7WLByte, buffer), token, TaskCreationOptions.None, scheduler);
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
if (result != 0)
|
||||
{
|
||||
EvaluateErrorCode(result);
|
||||
var errorText = sharp7.ErrorText(result);
|
||||
throw new InvalidOperationException($"Error reading {operand}{dBNr}:{startByteAddress}->{bytesToRead} ({errorText})");
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public async Task<bool> WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNr, CancellationToken token)
|
||||
{
|
||||
EnsureConnectionValid();
|
||||
|
||||
var buffer = new[] {value ? (byte) 0xff : (byte) 0};
|
||||
|
||||
var offsetStart = (startByteAddress * 8) + bitAdress;
|
||||
|
||||
var result = await Task.Factory.StartNew(() => sharp7.WriteArea(FromOperand(operand), dbNr, offsetStart, 1, S7Consts.S7WLBit, buffer), token, TaskCreationOptions.None, scheduler);
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
if (result != 0)
|
||||
{
|
||||
EvaluateErrorCode(result);
|
||||
return (false);
|
||||
}
|
||||
|
||||
return (true);
|
||||
}
|
||||
|
||||
public async Task<ushort> WriteBytes(Operand operand, ushort startByteAdress, byte[] data, ushort dBNr, CancellationToken token)
|
||||
{
|
||||
EnsureConnectionValid();
|
||||
|
||||
var result = await Task.Factory.StartNew(() => sharp7.WriteArea(FromOperand(operand), dBNr, startByteAdress, data.Length, S7Consts.S7WLByte, data), token, TaskCreationOptions.None, scheduler);
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
if (result != 0)
|
||||
{
|
||||
EvaluateErrorCode(result);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (ushort) (data.Length);
|
||||
}
|
||||
|
||||
|
||||
protected virtual void Dispose(bool disposing)
|
||||
{
|
||||
@@ -169,6 +229,18 @@ namespace Sharp7.Rx
|
||||
await Task.Factory.StartNew(() => sharp7.Disconnect(), CancellationToken.None, TaskCreationOptions.None, scheduler);
|
||||
}
|
||||
|
||||
private void EnsureConnectionValid()
|
||||
{
|
||||
if (disposed)
|
||||
throw new ObjectDisposedException("S7Connector");
|
||||
|
||||
if (sharp7 == null)
|
||||
throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized);
|
||||
|
||||
if (!IsConnected)
|
||||
throw new InvalidOperationException("Plc is not connected");
|
||||
}
|
||||
|
||||
private bool EvaluateErrorCode(int errorCode)
|
||||
{
|
||||
if (errorCode == 0)
|
||||
@@ -186,6 +258,23 @@ namespace Sharp7.Rx
|
||||
return false;
|
||||
}
|
||||
|
||||
private int FromOperand(Operand operand)
|
||||
{
|
||||
switch (operand)
|
||||
{
|
||||
case Operand.Input:
|
||||
return S7Consts.S7AreaPE;
|
||||
case Operand.Output:
|
||||
return S7Consts.S7AreaPA;
|
||||
case Operand.Marker:
|
||||
return S7Consts.S7AreaMK;
|
||||
case Operand.Db:
|
||||
return S7Consts.S7AreaDB;
|
||||
default:
|
||||
throw new ArgumentOutOfRangeException(nameof(operand), operand, null);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<bool> Reconnect()
|
||||
{
|
||||
await CloseConnection();
|
||||
@@ -204,91 +293,5 @@ namespace Sharp7.Rx
|
||||
{
|
||||
Dispose(false);
|
||||
}
|
||||
|
||||
private bool IsConnected => connectionStateSubject.Value == Enums.ConnectionState.Connected;
|
||||
|
||||
public async Task<byte[]> ReadBytes(Operand operand, ushort startByteAddress, ushort bytesToRead, ushort dBNr, CancellationToken token)
|
||||
{
|
||||
EnsureConnectionValid();
|
||||
|
||||
var buffer = new byte[bytesToRead];
|
||||
|
||||
var area = FromOperand(operand);
|
||||
|
||||
var result =
|
||||
await Task.Factory.StartNew(() => sharp7.ReadArea(area, dBNr, startByteAddress, bytesToRead, S7Consts.S7WLByte, buffer), token, TaskCreationOptions.None, scheduler);
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
if (result != 0)
|
||||
{
|
||||
EvaluateErrorCode(result);
|
||||
var errorText = sharp7.ErrorText(result);
|
||||
throw new InvalidOperationException($"Error reading {operand}{dBNr}:{startByteAddress}->{bytesToRead} ({errorText})");
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private int FromOperand(Operand operand)
|
||||
{
|
||||
switch (operand)
|
||||
{
|
||||
case Operand.Input:
|
||||
return S7Consts.S7AreaPE;
|
||||
case Operand.Output:
|
||||
return S7Consts.S7AreaPA;
|
||||
case Operand.Marker:
|
||||
return S7Consts.S7AreaMK;
|
||||
case Operand.Db:
|
||||
return S7Consts.S7AreaDB;
|
||||
default:
|
||||
throw new ArgumentOutOfRangeException(nameof(operand), operand, null);
|
||||
}
|
||||
}
|
||||
|
||||
private void EnsureConnectionValid()
|
||||
{
|
||||
if (disposed)
|
||||
throw new ObjectDisposedException("S7Connector");
|
||||
|
||||
if (sharp7 == null)
|
||||
throw new InvalidOperationException(StringResources.StrErrorS7DriverNotInitialized);
|
||||
|
||||
if (!IsConnected)
|
||||
throw new InvalidOperationException("Plc is not connected");
|
||||
}
|
||||
|
||||
public async Task<ushort> WriteBytes(Operand operand, ushort startByteAdress, byte[] data, ushort dBNr, CancellationToken token)
|
||||
{
|
||||
EnsureConnectionValid();
|
||||
|
||||
var result = await Task.Factory.StartNew(() => sharp7.WriteArea(FromOperand(operand), dBNr, startByteAdress, data.Length, S7Consts.S7WLByte, data), token, TaskCreationOptions.None, scheduler);
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
if (result != 0)
|
||||
{
|
||||
EvaluateErrorCode(result);
|
||||
return 0;
|
||||
}
|
||||
return (ushort)(data.Length);
|
||||
}
|
||||
public async Task<bool> WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNr, CancellationToken token)
|
||||
{
|
||||
EnsureConnectionValid();
|
||||
|
||||
var buffer = new byte[] { value ? (byte)0xff : (byte)0 };
|
||||
|
||||
var offsetStart = (startByteAddress * 8) + bitAdress;
|
||||
|
||||
var result = await Task.Factory.StartNew(() => sharp7.WriteArea(FromOperand(operand), dbNr, offsetStart, 1, S7Consts.S7WLBit, buffer), token, TaskCreationOptions.None, scheduler);
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
if (result != 0)
|
||||
{
|
||||
EvaluateErrorCode(result);
|
||||
return (false);
|
||||
}
|
||||
return (true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,13 +19,13 @@ namespace Sharp7.Rx
|
||||
{
|
||||
public class Sharp7Plc : IPlc
|
||||
{
|
||||
protected readonly CompositeDisposable Disposables = new CompositeDisposable();
|
||||
private readonly ConcurrentSubjectDictionary<string, byte[]> multiVariableSubscriptions = new ConcurrentSubjectDictionary<string, byte[]>(StringComparer.InvariantCultureIgnoreCase);
|
||||
private readonly List<long> performanceCoutner = new List<long>(1000);
|
||||
private readonly PlcConnectionSettings plcConnectionSettings;
|
||||
private readonly IS7VariableNameParser varaibleNameParser = new CacheVariableNameParser(new S7VariableNameParser());
|
||||
private bool disposed;
|
||||
private IS7Connector s7Connector;
|
||||
private readonly PlcConnectionSettings plcConnectionSettings;
|
||||
private readonly ConcurrentSubjectDictionary<string, byte[]> multiVariableSubscriptions = new ConcurrentSubjectDictionary<string, byte[]>(StringComparer.InvariantCultureIgnoreCase);
|
||||
protected readonly CompositeDisposable Disposables = new CompositeDisposable();
|
||||
private readonly List<long> performanceCoutner = new List<long>(1000);
|
||||
|
||||
|
||||
/// <summary>
|
||||
@@ -48,7 +48,7 @@ namespace Sharp7.Rx
|
||||
/// </param>
|
||||
public Sharp7Plc(string ipAddress, int rackNumber, int cpuMpiAddress, int port = 102, TimeSpan? multiVarRequestCycleTime = null)
|
||||
{
|
||||
plcConnectionSettings = new PlcConnectionSettings() { IpAddress = ipAddress, RackNumber = rackNumber, CpuMpiAddress = cpuMpiAddress, Port = port };
|
||||
plcConnectionSettings = new PlcConnectionSettings {IpAddress = ipAddress, RackNumber = rackNumber, CpuMpiAddress = cpuMpiAddress, Port = port};
|
||||
|
||||
if (multiVarRequestCycleTime != null && multiVarRequestCycleTime > TimeSpan.FromMilliseconds(5))
|
||||
MultiVarRequestCycleTime = multiVarRequestCycleTime.Value;
|
||||
@@ -56,6 +56,60 @@ namespace Sharp7.Rx
|
||||
|
||||
public IObservable<ConnectionState> ConnectionState { get; private set; }
|
||||
public ILogger Logger { get; set; }
|
||||
public TimeSpan MultiVarRequestCycleTime { get; } = TimeSpan.FromSeconds(0.1);
|
||||
|
||||
public int MultiVarRequestMaxItems { get; set; } = 16;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
public IObservable<TValue> CreateNotification<TValue>(string variableName, TransmissionMode transmissionMode)
|
||||
{
|
||||
return Observable.Create<TValue>(observer =>
|
||||
{
|
||||
var address = varaibleNameParser.Parse(variableName);
|
||||
if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName));
|
||||
|
||||
var disposables = new CompositeDisposable();
|
||||
var disposeableContainer = multiVariableSubscriptions.GetOrCreateObservable(variableName);
|
||||
disposeableContainer.AddDisposableTo(disposables);
|
||||
|
||||
var observable = disposeableContainer.Observable
|
||||
.Select(bytes => S7ValueConverter.ConvertToType<TValue>(bytes, address));
|
||||
|
||||
if (transmissionMode == TransmissionMode.OnChange)
|
||||
observable = observable.DistinctUntilChanged();
|
||||
|
||||
observable.Subscribe(observer)
|
||||
.AddDisposableTo(disposables);
|
||||
|
||||
return disposables;
|
||||
});
|
||||
}
|
||||
|
||||
public Task<TValue> GetValue<TValue>(string variableName)
|
||||
{
|
||||
return GetValue<TValue>(variableName, CancellationToken.None);
|
||||
}
|
||||
|
||||
|
||||
public Task SetValue<TValue>(string variableName, TValue value)
|
||||
{
|
||||
return SetValue(variableName, value, CancellationToken.None);
|
||||
}
|
||||
|
||||
|
||||
public async Task<TValue> GetValue<TValue>(string variableName, CancellationToken token)
|
||||
{
|
||||
var address = varaibleNameParser.Parse(variableName);
|
||||
if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName));
|
||||
|
||||
var data = await s7Connector.ReadBytes(address.Operand, address.Start, address.Length, address.DbNr, token);
|
||||
return S7ValueConverter.ConvertToType<TValue>(data, address);
|
||||
}
|
||||
|
||||
public async Task<bool> InitializeAsync()
|
||||
{
|
||||
@@ -84,28 +138,6 @@ namespace Sharp7.Rx
|
||||
return true;
|
||||
}
|
||||
|
||||
public Task<TValue> GetValue<TValue>(string variableName)
|
||||
{
|
||||
return GetValue<TValue>(variableName, CancellationToken.None);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public async Task<TValue> GetValue<TValue>(string variableName, CancellationToken token)
|
||||
{
|
||||
var address = varaibleNameParser.Parse(variableName);
|
||||
if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName));
|
||||
|
||||
var data = await s7Connector.ReadBytes(address.Operand, address.Start, address.Length, address.DbNr, token);
|
||||
return S7ValueConverter.ConvertToType<TValue>(data, address);
|
||||
}
|
||||
|
||||
|
||||
public Task SetValue<TValue>(string variableName, TValue value)
|
||||
{
|
||||
return SetValue(variableName, value, CancellationToken.None);
|
||||
}
|
||||
|
||||
public async Task SetValue<TValue>(string variableName, TValue value, CancellationToken token)
|
||||
{
|
||||
var address = varaibleNameParser.Parse(variableName);
|
||||
@@ -139,7 +171,7 @@ namespace Sharp7.Rx
|
||||
else if (typeof(TValue) == typeof(float))
|
||||
{
|
||||
var buffer = new byte[sizeof(float)];
|
||||
S7.SetRealAt(buffer, 0, (float)(object)value);
|
||||
buffer.SetRealAt(0, (float) (object) value);
|
||||
await s7Connector.WriteBytes(address.Operand, address.Start, buffer, address.DbNr, token);
|
||||
}
|
||||
else if (typeof(TValue) == typeof(string))
|
||||
@@ -173,36 +205,6 @@ namespace Sharp7.Rx
|
||||
}
|
||||
}
|
||||
|
||||
public IObservable<TValue> CreateNotification<TValue>(string variableName, TransmissionMode transmissionMode)
|
||||
{
|
||||
return Observable.Create<TValue>(observer =>
|
||||
{
|
||||
var address = varaibleNameParser.Parse(variableName);
|
||||
if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName));
|
||||
|
||||
var disposables = new CompositeDisposable();
|
||||
var disposeableContainer = multiVariableSubscriptions.GetOrCreateObservable(variableName);
|
||||
disposeableContainer.AddDisposableTo(disposables);
|
||||
|
||||
var observable = disposeableContainer.Observable
|
||||
.Select(bytes => S7ValueConverter.ConvertToType<TValue>(bytes, address));
|
||||
|
||||
if (transmissionMode == TransmissionMode.OnChange)
|
||||
observable = observable.DistinctUntilChanged();
|
||||
|
||||
observable.Subscribe(observer)
|
||||
.AddDisposableTo(disposables);
|
||||
|
||||
return disposables;
|
||||
});
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
protected virtual void Dispose(bool disposing)
|
||||
{
|
||||
if (disposed) return;
|
||||
@@ -223,21 +225,6 @@ namespace Sharp7.Rx
|
||||
}
|
||||
}
|
||||
|
||||
~Sharp7Plc()
|
||||
{
|
||||
Dispose(false);
|
||||
}
|
||||
|
||||
private IDisposable RunNotifications(IS7Connector connector, TimeSpan cycle)
|
||||
{
|
||||
return ConnectionState.FirstAsync()
|
||||
.Select(states => states == Enums.ConnectionState.Connected)
|
||||
.SelectMany(connected => GetAllValues(connected, connector))
|
||||
.RepeatAfterDelay(MultiVarRequestCycleTime)
|
||||
.LogAndRetryAfterDelay(Logger, cycle, "Error while getting batch notifications from plc")
|
||||
.Subscribe();
|
||||
}
|
||||
|
||||
private async Task<Unit> GetAllValues(bool connected, IS7Connector connector)
|
||||
{
|
||||
if (!connected)
|
||||
@@ -272,13 +259,26 @@ namespace Sharp7.Rx
|
||||
var min = performanceCoutner.Min();
|
||||
var max = performanceCoutner.Max();
|
||||
|
||||
Logger?.LogTrace("Performance statistic during {0} elements of plc notification. Min: {1}, Max: {2}, Average: {3}, Plc: '{4}', Number of variables: {5}, Batch size: {6}", performanceCoutner.Capacity, min, max, average, plcConnectionSettings.IpAddress, multiVariableSubscriptions.ExistingKeys.Count(),
|
||||
Logger?.LogTrace("Performance statistic during {0} elements of plc notification. Min: {1}, Max: {2}, Average: {3}, Plc: '{4}', Number of variables: {5}, Batch size: {6}", performanceCoutner.Capacity, min, max, average, plcConnectionSettings.IpAddress,
|
||||
multiVariableSubscriptions.ExistingKeys.Count(),
|
||||
MultiVarRequestMaxItems);
|
||||
performanceCoutner.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
public int MultiVarRequestMaxItems { get; set; } = 16;
|
||||
public TimeSpan MultiVarRequestCycleTime { get; private set; } = TimeSpan.FromSeconds(0.1);
|
||||
private IDisposable RunNotifications(IS7Connector connector, TimeSpan cycle)
|
||||
{
|
||||
return ConnectionState.FirstAsync()
|
||||
.Select(states => states == Enums.ConnectionState.Connected)
|
||||
.SelectMany(connected => GetAllValues(connected, connector))
|
||||
.RepeatAfterDelay(MultiVarRequestCycleTime)
|
||||
.LogAndRetryAfterDelay(Logger, cycle, "Error while getting batch notifications from plc")
|
||||
.Subscribe();
|
||||
}
|
||||
|
||||
~Sharp7Plc()
|
||||
{
|
||||
Dispose(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user