Check-in [590cb5e905]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:nanomsg error handling and other fixes
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 590cb5e90598a4238305665925ee77c32573f8ed
User & Date: ovenpasta@pizzahack.eu 2016-08-17 07:41:06
Context
2016-08-17
07:42
added cairo check-in: 5dd4463993 user: ovenpasta@pizzahack.eu tags: trunk
07:41
nanomsg error handling and other fixes check-in: 590cb5e905 user: ovenpasta@pizzahack.eu tags: trunk
06:26
added nanomsg check-in: bac484c8c0 user: ovenpasta@pizzahack.eu tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to nanomsg.sls.

1
2
3
4
5
6
7
8
9
10
11
12
13
...
195
196
197
198
199
200
201









202
203
204
205
206
207
208
209
...
297
298
299
300
301
302
303
304
305







306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
#!r6rs
(library 
 (nanomsg) 
 (export 
  nanomsg-library-init
  nn-error nn-bind nn-send nn-recv nn-connect nn-poll nn-close
  nn-socket nn-assert nn-shutdown nn-freemsg nn-recvmsg nn-sendmsg
  nn-strerror nn-setsockopt nn-setsockopt/int
  nn-getsockopt nn-get-statistic nn-device nn-symbol

  NN_MSG

  NN_SOCKADDR_MAX
................................................................................
			     (syntax->datum #'name)) "-ft")))] )
	 #`(begin
	     (define (name arg-name ...) 
	       (define-ftype function-ftype (function (arg-type ...) ret-type))
	       (let* ([function-fptr  (make-ftype-pointer function-ftype c-name)]
		      [function       (ftype-ref function-ftype () function-fptr)])
		 (let ([result (function arg-name ...)])









		   result)))))])))
 
 
 (define-syntax nn-error
   (syntax-rules ()
     ((_ name n )
      (define-syntax name (identifier-syntax (+ 156384712 n))))))
 
................................................................................

 (define-nn-func int nn-bind ((s int) (addr string)) "nn_bind")

 (define-nn-func int nn-connect ((s int) (addr string)) "nn_connect")

 (define-nn-func int nn-shutdown ((s int) (how int)) "nn_shutdown")

 (define-nn-func int nn-send ((s int) (buf u8*) (len size_t) (flags int))
   "nn_send")








 (define-nn-func int nn-recv% ((s int) (buf void*) (len size_t) (flags int))
   "nn_recv")

 (define (char*->string fptr . bytes)
   (let f ([i 0])
     (let ([c (ftype-ref char () fptr i)])
       (if (or (char=? c #\nul) (and bytes (>= (+ 1 i) (car bytes))))
	   (make-string i)
	   (let ([str (f (fx+ i 1))])
	     (string-set! str i c)
	     str)))))

 (define (char*->bytevector fptr bytes)
   (let f ([i 0])
     (let ([c (ftype-ref char () fptr i)])
       (if (>= i  bytes)
	   (make-bytevector i)
	   (let ([bb (f (fx+ i 1))])





|







 







>
>
>
>
>
>
>
>
>
|







 







|

>
>
>
>
>
>
>




|
|
|
|
|
|
|
|







1
2
3
4
5
6
7
8
9
10
11
12
13
...
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
...
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#!r6rs
(library 
 (nanomsg) 
 (export 
  nanomsg-library-init
  nn-errno nn-strerror  nn-bind nn-send nn-recv nn-connect nn-poll nn-close
  nn-socket nn-assert nn-shutdown nn-freemsg nn-recvmsg nn-sendmsg
  nn-strerror nn-setsockopt nn-setsockopt/int
  nn-getsockopt nn-get-statistic nn-device nn-symbol

  NN_MSG

  NN_SOCKADDR_MAX
................................................................................
			     (syntax->datum #'name)) "-ft")))] )
	 #`(begin
	     (define (name arg-name ...) 
	       (define-ftype function-ftype (function (arg-type ...) ret-type))
	       (let* ([function-fptr  (make-ftype-pointer function-ftype c-name)]
		      [function       (ftype-ref function-ftype () function-fptr)])
		 (let ([result (function arg-name ...)])
		   #,(if (and (eq? (datum ret-type) 'int) 
			      (not (eq? (datum name) 'nn-errno)))
			 #'(if (< result 0)
			       (let ([errno (nn-errno)])
				 (if (= errno EAGAIN) 
				     #f 
				     (errorf 'name "returned error ~d: ~d"
					     errno (nn-strerror errno))))
			       result)
			 #'result))))))])))
 
 
 (define-syntax nn-error
   (syntax-rules ()
     ((_ name n )
      (define-syntax name (identifier-syntax (+ 156384712 n))))))
 
................................................................................

 (define-nn-func int nn-bind ((s int) (addr string)) "nn_bind")

 (define-nn-func int nn-connect ((s int) (addr string)) "nn_connect")

 (define-nn-func int nn-shutdown ((s int) (how int)) "nn_shutdown")

 (define-nn-func int nn-send% ((s int) (buf u8*) (len size_t) (flags int))
   "nn_send")

 (define (nn-send s buf flags)
   (let* ([len (bytevector-length buf)]
	  [r (nn-send% s buf len flags)])
     (if (not (= r len))
	 (errorf 'nn-send "bytes sent ~d/~d" r len)
	 r)))

 (define-nn-func int nn-recv% ((s int) (buf void*) (len size_t) (flags int))
   "nn_recv")

 ;; (define (char*->string fptr . bytes)
 ;;   (let f ([i 0])
 ;;     (let ([c (ftype-ref char () fptr i)])
 ;;       (if (or (char=? c #\nul) (and bytes (>= (+ 1 i) (car bytes))))
 ;; 	   (make-string i)
 ;; 	   (let ([str (f (fx+ i 1))])
 ;; 	     (string-set! str i c)
 ;; 	     str)))))

 (define (char*->bytevector fptr bytes)
   (let f ([i 0])
     (let ([c (ftype-ref char () fptr i)])
       (if (>= i  bytes)
	   (make-bytevector i)
	   (let ([bb (f (fx+ i 1))])

Changes to nanomsg/bus.

12
13
14
15
16
17
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

(define (sleep-s sec)
  (sleep (make-time 'time-duration 0 sec)))


(define (node argc argv)
  (define sock #f)

  (define r #f)
  (dynamic-wind 
      (lambda ()
	(set! sock (nn-socket AF_SP NN_BUS))
	(assert (>= sock 0)))
      (lambda ()
	(assert (>= (nn-bind sock (cadr argv)) 0))
	(sleep-s 1)
	(when (>= argc 2)
	      (let loop ([x 2])
		(define n (nn-connect sock (list-ref argv x)))
		(assert (>= n 0))
		(if (< (+ x 1) argc)
		    (loop (+ x 1)))))
	(sleep-s 1)
	
	(assert (>= (nn-setsockopt/int sock NN_SOL_SOCKET NN_RCVTIMEO 100) 0))

	(let* ([sz-n (string-length (car argv))]
	       [n (car argv)])
	  (printf "~d: SENDING ~d ONTO BUS~n" n n)
	  (assert (= (nn-send sock (string->utf8 n) sz-n 0) sz-n)))
	(let loop ()
	  (let* ([buf (box #t)]

		 [recv (nn-recv sock buf NN_MSG 0)])
	    (when (>= recv 0)
		  (printf "~d RECEIVED ~d FROM BUS~n" 
			  (car argv) (utf8->string (unbox buf))))
	    (loop))))
      (lambda ()
	(if sock (set! r (nn-shutdown sock 0))))
  r)

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (>= argc 2) 
       (node argc argv))]







>



|
<

|




|




|

<
|

|


>
|





|
|







12
13
14
15
16
17
18
19
20
21
22
23

24
25
26
27
28
29
30
31
32
33
34
35
36

37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

(define (sleep-s sec)
  (sleep (make-time 'time-duration 0 sec)))


(define (node argc argv)
  (define sock #f)
  (define eids '())
  (define r #f)
  (dynamic-wind 
      (lambda ()
	(set! sock (nn-socket AF_SP NN_BUS)))

      (lambda ()
	(nn-bind sock (cadr argv))
	(sleep-s 1)
	(when (>= argc 2)
	      (let loop ([x 2])
		(define n (nn-connect sock (list-ref argv x)))
		(set! eids (cons n eids))
		(if (< (+ x 1) argc)
		    (loop (+ x 1)))))
	(sleep-s 1)
	
	(nn-setsockopt/int sock NN_SOL_SOCKET NN_RCVTIMEO 100)


	(let* ([n (car argv)])
	  (printf "~d: SENDING ~d ONTO BUS~n" n n)
	  (nn-send sock (string->utf8 n) 0))
	(let loop ()
	  (let* ([buf (box #t)]
		 [recv (guard (x [(= (nn-errno) ETIMEDOUT) -1])
			      (nn-recv sock buf NN_MSG 0))])
	    (when (>= recv 0)
		  (printf "~d RECEIVED ~d FROM BUS~n" 
			  (car argv) (utf8->string (unbox buf))))
	    (loop))))
      (lambda ()
	(for-each (lambda (eid) 
		    (nn-shutdown sock eid)) eids))))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (>= argc 2) 
       (node argc argv))]

Changes to nanomsg/pair.

11
12
13
14
15
16
17
18
19
20
21
22
23

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

(nanomsg-library-init)

(define node0-name "node0")
(define node1-name "node1")

(define (send-name sock name)
 (define sz-n (string-length name))
 (printf "~s: SENDING '~s'~n" name name)
 (nn-send sock (string->utf8 name) sz-n 0))

(define (recv-name sock name)
  (let*  ([buf (box #t)]

	  [result (nn-recv sock buf NN_MSG 0)])
    (when (> result 0)
	  (printf "~d: RECEIVED '~d'~n" name (utf8->string (unbox buf))))
    result))

(define (sleep-s sec)
  (sleep (make-time 'time-duration 0 sec)))
	

(define (send-recv sock name)
  (assert (>= (nn-setsockopt/int sock NN_SOL_SOCKET NN_RCVTIMEO 100) 0))
 
  (let loop ()
    (recv-name sock name)
    (sleep-s 1)
    (send-name sock name)
    (loop)))
	
(define (node0 url)
  (define sock (nn-socket AF_SP NN_PAIR))
  (assert (>= sock 0))
  (assert (>= (nn-bind sock url) 0))
  (send-recv sock node0-name)
  (nn-shutdown sock 0))

(define (node1 url)
  (define sock (nn-socket AF_SP NN_PAIR))
  (assert (>= sock 0))
  (assert (>= (nn-connect sock url) 0))
  (send-recv sock node1-name)
  (nn-shutdown sock 0))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (> argc 1) (string=? node0-name (car argv)))
  (node0 (cadr argv))]







<

|



>
|







<

|
<








<
|

|



<
|

|







11
12
13
14
15
16
17

18
19
20
21
22
23
24
25
26
27
28
29
30
31

32
33

34
35
36
37
38
39
40
41

42
43
44
45
46
47

48
49
50
51
52
53
54
55
56
57

(nanomsg-library-init)

(define node0-name "node0")
(define node1-name "node1")

(define (send-name sock name)

 (printf "~s: SENDING '~s'~n" name name)
 (nn-send sock (string->utf8 name) 0))

(define (recv-name sock name)
  (let*  ([buf (box #t)]
	  [result (guard (x [(= (nn-errno) ETIMEDOUT) -1])
			 (nn-recv sock buf NN_MSG 0))])
    (when (> result 0)
	  (printf "~d: RECEIVED '~d'~n" name (utf8->string (unbox buf))))
    result))

(define (sleep-s sec)
  (sleep (make-time 'time-duration 0 sec)))
	

(define (send-recv sock name)
  (nn-setsockopt/int sock NN_SOL_SOCKET NN_RCVTIMEO 100)

  (let loop ()
    (recv-name sock name)
    (sleep-s 1)
    (send-name sock name)
    (loop)))
	
(define (node0 url)
  (define sock (nn-socket AF_SP NN_PAIR))

  (define eid (nn-bind sock url))
  (send-recv sock node0-name)
  (nn-shutdown sock eid))

(define (node1 url)
  (define sock (nn-socket AF_SP NN_PAIR))

  (define eid (nn-connect sock url))
  (send-recv sock node1-name)
  (nn-shutdown sock eid))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (> argc 1) (string=? node0-name (car argv)))
  (node0 (cadr argv))]

Changes to nanomsg/pipeline.

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
(nanomsg-library-init)

(define node0-name "node0")
(define node1-name "node1")

(define (node0 url)
  (define sock (nn-socket AF_SP NN_PULL))
  (assert (>= sock 0))
  (assert (>= (nn-bind sock url) 0))
  (let loop ()
    (let ([buf (box #t)])
      (define bytes (nn-recv sock buf NN_MSG 0))
      (assert (>= bytes 0))
      (printf "NODE0: RECEIVED '~d'~n" (utf8->string (unbox buf))))
    (loop)))


(define (node1 url msg)
  (define sz-msg (string-length msg))
  (define sock (nn-socket AF_SP NN_PUSH))
  (assert (>= sock 0))
  (assert (>= (nn-connect sock url) 0))
  (printf "NODE1: SENDING '~d'~n" msg)
  (let ([bytes (nn-send sock (string->utf8 msg) sz-msg 0)])
    (assert (= bytes sz-msg))
    (nn-shutdown sock 0)))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (> argc 1) (string=? node0-name (car argv)))
  (node0 (cadr argv))]







<
|



<





<

<
|

|
<
|







12
13
14
15
16
17
18

19
20
21
22

23
24
25
26
27

28

29
30
31

32
33
34
35
36
37
38
39
(nanomsg-library-init)

(define node0-name "node0")
(define node1-name "node1")

(define (node0 url)
  (define sock (nn-socket AF_SP NN_PULL))

  (nn-bind sock url)
  (let loop ()
    (let ([buf (box #t)])
      (define bytes (nn-recv sock buf NN_MSG 0))

      (printf "NODE0: RECEIVED '~d'~n" (utf8->string (unbox buf))))
    (loop)))


(define (node1 url msg)

  (define sock (nn-socket AF_SP NN_PUSH))

  (define eid (nn-connect sock url))
  (printf "NODE1: SENDING '~d'~n" msg)
  (let ([bytes (nn-send sock (string->utf8 msg) 0)])

    (nn-shutdown sock eid)))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (> argc 1) (string=? node0-name (car argv)))
  (node0 (cadr argv))]

Changes to nanomsg/pubsub.

16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
(define date date-and-time)

(define (sleep-s sec)
  (sleep (make-time 'time-duration 0 sec)))

(define (server url)
  (define sock (nn-socket AF_SP NN_PUB))
  (assert (>= sock 0))
  (assert (>= (nn-bind sock url) 0))
  (let loop ()
    (let* ([d (date)]
	   [sz-d (string-length d)])
      (printf "SERVER: PUBLISHING DATE ~d~n" d)
      (assert (= (nn-send sock (string->utf8 d) sz-d 0) sz-d))
      (sleep-s 1)
      (loop)))
  (nn-shutdown sock 0))

(define (client url name)
  (define sock (nn-socket AF_SP NN_SUB))
  (assert (>= sock 0))
  (assert (>= (nn-setsockopt sock NN_SUB NN_SUB_SUBSCRIBE 0 0)))
  (assert (>= (nn-connect sock url) 0))
  (let loop ()
    (let* ([buf (box #t)]
	   [bytes (nn-recv sock buf NN_MSG 0)])
      (assert (>= bytes 0))
      (printf "CLIENT (~d): RECEIVED ~d~n" name (utf8->string (unbox buf)))
      (loop)))
  (nn-shutdown sock 0))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (> argc 1) (string=? server-name (car argv)))
  (server (cadr argv))]







<
|

|
<

|


|



|
|
|



<


|







16
17
18
19
20
21
22

23
24
25

26
27
28
29
30
31
32
33
34
35
36
37
38
39

40
41
42
43
44
45
46
47
48
49
(define date date-and-time)

(define (sleep-s sec)
  (sleep (make-time 'time-duration 0 sec)))

(define (server url)
  (define sock (nn-socket AF_SP NN_PUB))

  (define eid (nn-bind sock url))
  (let loop ()
    (let* ([d (date)])

      (printf "SERVER: PUBLISHING DATE ~d~n" d)
      (nn-send sock (string->utf8 d) 0)
      (sleep-s 1)
      (loop)))
  (nn-shutdown sock eid))

(define (client url name)
  (define sock (nn-socket AF_SP NN_SUB))
  (define eid #f)
  (nn-setsockopt sock NN_SUB NN_SUB_SUBSCRIBE 0 0)
  (set! eid (nn-connect sock url))
  (let loop ()
    (let* ([buf (box #t)]
	   [bytes (nn-recv sock buf NN_MSG 0)])

      (printf "CLIENT (~d): RECEIVED ~d~n" name (utf8->string (unbox buf)))
      (loop)))
  (nn-shutdown sock eid))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (> argc 1) (string=? server-name (car argv)))
  (server (cadr argv))]

Changes to nanomsg/reqrep.

14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
(define node0-name "node0")
(define node1-name "node1")
(define date-name "DATE")

(define date date-and-time)

(define (node0 url)
  (define sz-date (string-length date-name))
  (define sock (nn-socket AF_SP NN_REP))
  (assert (>= sock 0))
  (assert (>= (nn-bind sock url) 0))
  (let loop ()
    (let ([buf (box #t)])
      (define bytes (nn-recv sock buf NN_MSG 0))
      (assert (>= bytes 0))
      (when (string=? date-name (utf8->string (unbox buf)))
	    (let* ([d (date)]
		   [sz-d (string-length d)])
	      (printf "NODE0: RECEIVED DATE REQUEST~n")
	      (printf "NODE0: SENDING DATE ~d~n" d)
	      (assert (= (nn-send sock (string->utf8 d) sz-d 0)
			 sz-d))))
      (loop)))
  (nn-shutdown sock 0))

(define (node1 url)
  (define sz-date (string-length date-name))
  (define sock (nn-socket AF_SP NN_REQ))
  (assert (>= sock 0))
  (assert (>= (nn-connect sock url) 0))
  (printf "NODE1: SENDING DATE REQUEST '~d'~n" date-name)
  (let ([bytes (nn-send sock (string->utf8 date-name) sz-date 0)]
	[buf (box #t)])
    (assert (= bytes sz-date))
    (let ([bytes (nn-recv sock buf NN_MSG 0)])
      (assert (>= bytes 0))
      (printf "NODE1: RECEIVED DATE ~d~n" (utf8->string (unbox buf)))))
  (nn-shutdown sock 0))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (> argc 1) (string=? node0-name (car argv)))
  (node0 (cadr argv))]







<

<
|



<

|
<


|
<

|


<

<
|

|
|
<
|
<
|
|







14
15
16
17
18
19
20

21

22
23
24
25

26
27

28
29
30

31
32
33
34

35

36
37
38
39

40

41
42
43
44
45
46
47
48
49
(define node0-name "node0")
(define node1-name "node1")
(define date-name "DATE")

(define date date-and-time)

(define (node0 url)

  (define sock (nn-socket AF_SP NN_REP))

  (define eid (nn-bind sock url))
  (let loop ()
    (let ([buf (box #t)])
      (define bytes (nn-recv sock buf NN_MSG 0))

      (when (string=? date-name (utf8->string (unbox buf)))
	    (let* ([d (date)])

	      (printf "NODE0: RECEIVED DATE REQUEST~n")
	      (printf "NODE0: SENDING DATE ~d~n" d)
	      (nn-send sock (string->utf8 d) 0)))

      (loop)))
  (nn-shutdown sock eid))

(define (node1 url)

  (define sock (nn-socket AF_SP NN_REQ))

  (define eid (nn-connect sock url))
  (printf "NODE1: SENDING DATE REQUEST '~d'~n" date-name)
  (nn-send sock (string->utf8 date-name) 0)
  (let ([buf (box #t)])

    (nn-recv sock buf NN_MSG 0)

    (printf "NODE1: RECEIVED DATE ~d~n" (utf8->string (unbox buf))))
  (nn-shutdown sock eid))

(define argv (command-line-arguments))
(define argc (length argv))

(cond
 [(and (> argc 1) (string=? node0-name (car argv)))
  (node0 (cadr argv))]