00001 #include <stdlib.h>
00002 #include <string.h>
00003 #include "converse.h"
00004
00005 typedef struct Cfuture_data_s
00006 {
00007 void *value;
00008 int ready;
00009 CthThread waiters;
00010 }
00011 *futdata;
00012
00013 typedef struct CfutureValue_s
00014 {
00015 char core[CmiMsgHeaderSizeBytes];
00016 struct Cfuture_data_s *data;
00017 int valsize;
00018 double rest[1];
00019 }
00020 *CfutureValue;
00021
00022 #define field_offset(t, f) ((size_t)(((t)0)->f))
00023 #define void_to_value(v) ((CfutureValue)(((char*)v)-field_offset(CfutureValue,rest)))
00024
00025 CpvDeclare(int, CfutureStoreIndex);
00026
00027 Cfuture CfutureCreate(void)
00028 {
00029 futdata data = (futdata)malloc(sizeof(struct Cfuture_data_s));
00030 Cfuture result;
00031 _MEMCHECK(data);
00032 data->value = 0;
00033 data->ready = 0;
00034 data->waiters = 0;
00035 result.pe = CmiMyPe();
00036 result.data = data;
00037 return result;
00038 }
00039
00040 static void CfutureAwaken(futdata data, CfutureValue val)
00041 {
00042 CthThread t;
00043 data->value = val;
00044 data->ready = 1;
00045 for (t=data->waiters; t; t=CthGetNext(t))
00046 CthAwaken(t);
00047 data->waiters=0;
00048 }
00049
00050 static void CfutureStore(CfutureValue m)
00051 {
00052 CfutureAwaken(m->data, m);
00053 }
00054
00055 void *CfutureCreateBuffer(int bytes)
00056 {
00057 int valsize = sizeof(struct CfutureValue_s) + bytes;
00058 CfutureValue m = (CfutureValue)CmiAlloc(valsize);
00059 CmiSetHandler(m, CpvAccess(CfutureStoreIndex));
00060 m->valsize = valsize;
00061 return (void*)(m->rest);
00062 }
00063
00064 void CfutureDestroyBuffer(void *v)
00065 {
00066 CmiFree(v);
00067 }
00068
00069 void CfutureStoreBuffer(Cfuture f, void *value)
00070 {
00071 CfutureValue m = void_to_value(value);
00072 if (f.pe == CmiMyPe()) {
00073 CfutureAwaken(f.data, m);
00074 } else {
00075 m->data = f.data;
00076 CmiSyncSendAndFree(f.pe, m->valsize, m);
00077 }
00078 }
00079
00080 void CfutureSet(Cfuture f, void *value, int len)
00081 {
00082 void *copy = CfutureCreateBuffer(len);
00083 memcpy(copy, value, len);
00084 CfutureStoreBuffer(f, copy);
00085 }
00086
00087 void *CfutureWait(Cfuture f)
00088 {
00089 CthThread self; CfutureValue value; futdata data;
00090 if (f.pe != CmiMyPe()) {
00091 CmiPrintf("error: CfutureWait: future not local.\n");
00092 exit(1);
00093 }
00094 data = f.data;
00095 if (data->ready == 0) {
00096 self = CthSelf();
00097 CthSetNext(self, data->waiters);
00098 data->waiters = self;
00099 CthSuspend();
00100 }
00101 value = (CfutureValue)data->value;
00102 return (void*)(value->rest);
00103 }
00104
00105 void CfutureDestroy(Cfuture f)
00106 {
00107 if (f.pe != CmiMyPe()) {
00108 CmiPrintf("error: CfutureDestroy: future not local.\n");
00109 exit(1);
00110 }
00111 if (f.data->waiters) {
00112 CmiPrintf("error: CfutureDestroy: destroying an active future.\n");
00113 exit(1);
00114 }
00115 if (f.data->value) CmiFree(f.data->value);
00116 free(f.data);
00117 }
00118
00119 extern "C" void CfutureModuleInit(void)
00120 {
00121 CpvInitialize(int, CfutureStoreIndex);
00122 CpvAccess(CfutureStoreIndex) = CmiRegisterHandler((CmiHandler)CfutureStore);
00123 }