当前位置: 移动技术网 > IT编程>开发语言>c# > 简单实现C#异步操作

简单实现C#异步操作

2019年07月18日  | 移动技术网IT编程  | 我要评论
在.net4.0以后异步操作,并行计算变得异常简单,但是由于公司项目开发基于.net3.5所以无法用到4.0的并行计算以及task等异步编程。因此,为了以后更方便的进行异步

在.net4.0以后异步操作,并行计算变得异常简单,但是由于公司项目开发基于.net3.5所以无法用到4.0的并行计算以及task等异步编程。因此,为了以后更方便的进行异步方式的开发,我封装实现了异步编程框架,通过begininvoke、endinvoke的方式实现异步编程。

一、框架结构

整个框架包括四个部分

1、基类抽象opeartor
我把每个异步执行过程称为一个operate,因此需要一个opeartor去执行
2、funcasync
异步的func
3、actionasync
异步的action
4、asynchorus
对actionasync和funcasync的封装

operator
operator是一个抽象类,实现了ioperationasync和icontinuewithasync两个接口。
ioperationasync实现了异步操作,icontinuewithasync实现了类似于task的continuewith方法,在当前异步操作完成后继续进行的操作

ioperationasync接口详解

public interface ioperationasync
{
  iasyncresult invoke();
  void wait();
  void completedcallback(iasyncresult ar);
  void catchexception(exception exception);
}
  • invoke():异步方法的调用
  • wait():等待异步操作执行
  • completedcallback():操作完成回调
  • catchexception():抓取异常

icontinuewithasync接口详情

public interface icontinuewithasync
{
  operator previous { get; set; }
  operator next { get; set; }
  operator continuewithasync(action action);
  operator continuewithasync<tparameter>(action<tparameter> action, tparameter parameter);
}

previous:前一个操作
next:下一个操作
continuewithasync():异步继续操作

public abstract class operator : ioperationasync, icontinuewithasync
{
  public iasyncresult middle;
  public readonly string id;
  public exception exception { get; private set; }
  public operator previous { get; set; }
  public operator next { get; set; }
  protected operator()
  {
    id = guid.newguid().tostring();
  }
  public abstract iasyncresult invoke();
  protected void setasyncresult(iasyncresult result)
  {
    this.middle = result;
  }
  public virtual void wait()
  {
    if (!middle.iscompleted) middle.asyncwaithandle.waitone();
  }
  public virtual void completedcallback(iasyncresult ar)
  {
  }
  public void catchexception(exception exception)
  {
    this.exception = exception;
  }
  protected operator continueasync()
  {
    if (next != null) next.invoke();
    return next;
  }
  public virtual operator continuewithasync(action action)
  {
    next = new actionasync(action);
    next.previous = this;
    return next;
  }
  public virtual operator continuewithasync<tparameter>(action<tparameter> action, tparameter parameter)
  {
    next = new actionasync<tparameter>(action, parameter);
    next.previous = this;
    return next;
  }
  public virtual operator continuewithasync<tresult>(func<tresult> func)
  {
    next = new funcasync<tresult>();
    next.previous = this;
    return next;
  }
  public virtual operator continuewithasync<tparameter, tresult>(func<tparameter, tresult> func,
    tparameter parameter)
  {
    next = new funcasync<tparameter, tresult>(func, parameter);
    next.previous = this;
    return next;
  }
}

无返回异步操作
actionasync

public class actionasync : operator
{
  private readonly action _action;
  protected actionasync()
  {
  }
  public actionasync(action action)
    : this()
  {
    this._action = action;
  }
  public override iasyncresult invoke()
  {
    var middle = _action.begininvoke(completedcallback, null);
    setasyncresult(middle);
    return middle;
  }
  public override void completedcallback(iasyncresult ar)
  {
    try
    {
      _action.endinvoke(ar);
    }
    catch (exception exception)
    {
      this.catchexception(exception);
    }
    continueasync();
  }
}
public class actionasync<t> : actionasync
{
  public t result;
  private readonly action<t> _action1;
  protected readonly t parameter1;
  public actionasync()
  {
  }
  public actionasync(t parameter)
  {
    this.parameter1 = parameter;
  }
  public actionasync(action<t> action, t parameter)
  {
    this._action1 = action;
    this.parameter1 = parameter;
  }
  public override iasyncresult invoke()
  {
    var result = _action1.begininvoke(parameter1, completedcallback, null);
    setasyncresult(result);
    return result;
  }
  public override void completedcallback(iasyncresult ar)
  {
    try
    {
      _action1.endinvoke(ar);
    }
    catch (exception exception)
    {
      this.catchexception(exception);
    }
    continueasync();
  }
}

有返回异步
funcasync实现了ifuncoperationasync接口

ifuncoperationasync

public interface ifuncoperationasync<t>
{
  void setresult(t result);
  t getresult();
}
  • setresult(t result):异步操作完成设置返回值
  • getresult():获取返回值

1)、funcasync

public class funcasync<tresult> : operator, ifuncoperationasync<tresult>
{
private tresult _result;

public tresult result
{
  get
  {
    if (!middle.iscompleted || _result == null)
    {
      _result = getresult();
    }
    return _result;
  }
}
private readonly func<tresult> _func1;
public funcasync()
{
}
public funcasync(func<tresult> func)
{
  this._func1 = func;
}
public override iasyncresult invoke()
{
  var result = _func1.begininvoke(completedcallback, null);
  setasyncresult(result);
  return result;
}
public override void completedcallback(iasyncresult ar)
{
  try
  {
    var result = _func1.endinvoke(ar);
    setresult(result);
  }
  catch (exception exception)
  {
    this.catchexception(exception);
    setresult(default(tresult));
  }
  continueasync();
}
public virtual tresult getresult()
{
  wait();
  return this._result;
}
public void setresult(tresult result)
{
  _result = result;
}
}
public class funcasync<t1, tresult> : funcasync<tresult>
{
protected readonly t1 parameter1;
private readonly func<t1, tresult> _func2;
public funcasync(func<t1, tresult> action, t1 parameter1)
  : this(parameter1)
{
  this._func2 = action;
}
protected funcasync(t1 parameter1)
  : base()
{
  this.parameter1 = parameter1;
}
public override iasyncresult invoke()
{
  var result = _func2.begininvoke(parameter1, completedcallback, null);
  setasyncresult(result);
  return result;
}
public override void completedcallback(iasyncresult ar)
{
  try
  {
    var result = _func2.endinvoke(ar);
    setresult(result);
  }
  catch (exception exception)
  {
    catchexception(exception);
    setresult(default(tresult));
  }
  continueasync();
}
}

asynchronous 异步操作封装
actionasync和funcasync为异步操作打下了基础,接下来最重要的工作就是通过这两个类执行我们的异步操作,为此我封装了一个异步操作类
主要封装了以下几个部分:

  • waitall(ienumerable<operator> operations):等待所有操作执行完毕
  • waitany(ienumerable<operator> operations):等待任意操作执行完毕
  • actionasync
  • funcasync
  • continuewithaction
  • continuewithfunc

后面四个包含若干个重载,这里只是笼统的代表一个类型的方法
waitall

public static void waitall(ienumerable<operator> operations)
{
foreach (var @operator in operations)
{
  @operator.wait();
}
}

waitany

public static void waitany(ienumerable<operator> operations)
{
while (operations.all(o => !o.middle.iscompleted))
  thread.sleep(100);
}

等待时间可以自定义
actioninvoke

public static operator invoke(action action)
{
operator operation = new actionasync(action);
operation.invoke();
return operation;
}
public static operator invoke<t>(action<t> action, t parameter)
{
operator operation = new actionasync<t>(action, parameter);
operation.invoke();
return operation;
}
public static operator invoke<t1, t2>(action<t1, t2> action, t1 parameter1, t2 parameter2)
{
operator operation = new actionasync<t1, t2>(action, parameter1, parameter2);
operation.invoke();
return operation;
}

funcinvoke

public static operator invoke<tresult>(func<tresult> func)
{
operator operation = new funcasync<tresult>(func);
operation.invoke();
return operation;
}
public static operator invoke<tparameter, tresult>(func<tparameter, tresult> func, tparameter parameter)
{
tparameter param = parameter;
operator operation = new funcasync<tparameter, tresult>(func, param);
operation.invoke();
return operation;
}
public static operator invoke<t1, t2, tresult>(func<t1, t2, tresult> func, t1 parameter1, t2 parameter2)
{
operator operation = new funcasync<t1, t2, tresult>(func, parameter1, parameter2);
operation.invoke();
return operation;
}

continuewithaction

public static operator continuewithasync(ienumerable<operator>operators, action action)
{
return invoke(waitall, operators)
  .continuewithasync(action);
}
public static operator continuewithasync<tparameter>(ienumerable<operator> operators, action<tparameter> action, tparameter parameter)
{
return invoke(waitall, operators)
  .continuewithasync(action, parameter);
}

continuewithfunc

public static operator continuewithasync<tresult>(ienumerable<operator> operators,func<tresult> func)
{
return invoke(waitall, operators)
  .continuewithasync(func);
}
public static operator continuewithasync<tparameter, tresult>(ienumerable<operator> operators, 
func<tparameter, tresult> func, tparameter parameter)
{
return invoke(waitall, operators)
  .continuewithasync(func, parameter);
}

这里有个bug当调用continuewithasync后无法调用wait等待,本来wait需要从前往后等待每个异步操作,但是测试了下不符合预期结果。不过理论上来说应该无需这样操作,continuewithasync只是为了当上一个异步操作执行完毕时继续执行的异步操作,若要等待,那不如两个操作放到一起,最后再等待依然可以实现。
前面的都是单步异步操作的调用,若需要对某集合进行某个方法的异步操作,可以foreach遍历

public void foreachasync(ienumerbale<string> parameters)
{
  foreach(string p in parameters)
  {
    asynchronous.invoke(tast,p);
  }
}
public void test(string parameter)
{
  //todo:做一些事
}

每次都需要去手写foreach,比较麻烦,因此实现类似于plinq的并行计算方法实在有必要,不过有一点差别,plinq是采用多核cpu进行并行计算,而我封装的仅仅遍历集合进行异步操作而已
foreachaction

public static ienumerable<operator> foreach<tparameter>(ienumerable<tparameter> items, action<tparameter> action)
{
  return items.select(t => invoke(action, t)).tolist();
}

foreachfunc

public static ienumerable<operator> foreach<tparameter, tresult>(ienumerable<tparameter> items, func<tparameter, tresult> func)
{
  return items.select(parameter => invoke(func, parameter)).tolist();
}

如何使用
无返回值异步方法调用

public void dosomething()
{
//todo:
}

通过asynchronous.invoke(dosomething) 执行

public void dosomething(string parameter)
{
//todo:
}

通过asynchronous.invoke(dosomething,parameter) 执行

有返回值异步方法调用

public string dosomething()
{
//todo:
}

通过asynchronous.invoke(()=>dosomething())执行

public string dosomething(string parameter)
{
//todo:
}

通过asynchronous.invoke(()=>dosomething(parameter))执行,或者也可以传入参数通过asynchronous.invoke(p=>dosomething(p),parameter)

无返回值foreach

public void test
{
int[] parameters = {1,2,3,4,5};
asynchronous.foreach(parameters,console.writeline);
}

有返回值foreach

public void test
{
int[] parameters = {1,2,3,4,5};
var operators = asynchronous.foreach(parameters,p=> p*2);
asynchrous.waitall(operators);
asynchronous.foreach(operators.cast<funcasync<int,int>>(),
  p=> console.writeline(p.result));
}

首先将集合每个值扩大2倍,然后输出
异步执行完再执行

public void test
{
int[] parameters = {1,2,3,4,5};
var operators = asynchronous.foreach(parameters,p=> p*2);
asynchrous.continuewithasync(operators,console.writeline,"执行完成");
}

每次执行完继续执行
可能有时候我们需要遍历一个集合,每个元素处理完成后我们需要输出xx处理完成

public void test
{
int[] parameters = {1,2,3,4,5};
var operators = asynchronous.foreach(parameters,p=> p*2);
asynchronous.foreach(operators,o=>{
  o.continuewithasync(()={
    //每个元素执行完时执行
    if(o.exception != null)
    {
      //之前执行时产生未处理的异常,这里可以捕获到 
    }
  });
});
}

可以实现链式异步操作

public void chain()
{
asynchronous.invoke(console.writeline,1)
.continuewithasync(console.writeline,2)
.continuewithasync(console.writeline,3)
}

这样会按步骤输出1,2,3
结束语

以上只是列出了部分重载方法,其他重载方法无非就是加参数,本质实际是一样的。

希望对大家的学习有所帮助,在这祝大家新年快乐,新的一年大家一起努力。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网